Commit ff89d6e0 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #2218 refs[t:2218] Merge r1629 to main

git-svn-id: file:///svn/toku/tokudb@16130 c7de825b-a66e-492c-adef-691d508d4ae1
parent fba65fa4
......@@ -214,8 +214,11 @@ static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) {
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
if (txn == NULL)
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); // cannot happen after checkpoint begin
return 0;
}
// commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn);
......@@ -239,8 +242,11 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
if (txn == NULL)
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); // cannot happen after checkpoint begin
return 0;
}
// abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn);
......@@ -292,7 +298,7 @@ abort_on_upgrade(DB* UU(pdb),
// Open the file if it is not already open. If it is already open, then do nothing.
static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags, u_int32_t descriptor_version, BYTESTRING* descriptor, int recovery_force_fcreate) {
static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags, int mode, char *fixedfname, FILENUM filenum, u_int32_t treeflags, u_int32_t descriptor_version, BYTESTRING* descriptor, int recovery_force_fcreate, TOKUTXN txn) {
int r;
// already open
......@@ -341,7 +347,7 @@ static int internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
}
int is_create = (flags & O_CREAT) != 0;
int only_create = is_create && (flags & O_TRUNC) != 0;
r = toku_brt_open_recovery(brt, fixedfname, fixedfname, is_create, only_create, renv->ct, NULL, fake_db, recovery_force_fcreate);
r = toku_brt_open_recovery(brt, fixedfname, fixedfname, is_create, only_create, renv->ct, txn, fake_db, recovery_force_fcreate);
if (r != 0) {
close_brt:;
//Note: If brt_open fails, then close_brt will NOT write a header to disk.
......@@ -360,12 +366,11 @@ close_brt:;
static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
}
static int
maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum) {
if (renv->ss.ss == SS_BACKWARD_SAW_CKPT_END) {
// close the tree
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, filenum, &tuple);
......@@ -379,7 +384,6 @@ maybe_do_fclose_during_recover_backward(RECOVER_ENV renv, FILENUM filenum) {
toku_free(fake_db); //Must free the DB after the brt is closed
file_map_remove(&renv->fmap, filenum);
}
}
return 0;
}
......@@ -395,15 +399,22 @@ static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
create_dir_from_file(fixedfname);
struct scan_state *ss = &renv->ss;
int flags;
int r;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
if (ss->ss == SS_BACKWARD_SAW_CKPT) {
//Treat as fopen
flags = 0;
}
else {
assert(txn != NULL); //Straddle txns cannot happen after checkpoint
assert(ss->ss == SS_FORWARD_SAW_CKPT);
flags = O_CREAT|O_TRUNC;
}
return internal_toku_recover_fopen_or_fcreate(renv, flags, l->mode, fixedfname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, 1);
return internal_toku_recover_fopen_or_fcreate(renv, flags, l->mode, fixedfname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, 1, txn);
}
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) {
......@@ -417,9 +428,11 @@ static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) {
TOKUTXN txn = NULL;
int r = toku_txnid2txn(renv->logger, l->txnid, &txn);
assert(r == 0);
//TODO: #??? (insert ticket number) bug that could leave orphaned files here.
if (txn == NULL)
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0;
}
char *fixediname = fixup_fname(&l->iname);
{ //Skip if does not exist.
toku_struct_stat buf;
......@@ -451,8 +464,11 @@ static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV re
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL)
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0;
}
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
......@@ -481,6 +497,10 @@ static int toku_recover_tablelock_on_empty_table(struct logtype_tablelock_on_emp
r = toku_brt_note_table_lock(tuple->brt, txn);
assert(r == 0);
}
else {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
}
return 0;
}
......@@ -499,8 +519,11 @@ static int toku_recover_enq_delete_both (struct logtype_enq_delete_both *l, RECO
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL)
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0;
}
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
......@@ -525,8 +548,11 @@ static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVE
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL)
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0;
}
DBT keydbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
r = toku_brt_maybe_delete(tuple->brt, &keydbt, txn, TRUE, l->lsn);
......@@ -557,11 +583,9 @@ static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV UU(renv))
}
static int toku_recover_backward_fclose (struct logtype_fclose *l, RECOVER_ENV renv) {
if (renv->ss.ss == SS_BACKWARD_SAW_CKPT) {
// tree open
char *fixedfname = fixup_fname(&l->iname);
internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0);
}
internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
return 0;
}
......@@ -602,7 +626,7 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi
return 0;
} else {
time_t tnow = time(NULL);
fprintf(stderr, "%.24s Tokudb recovery begin checkpoint at %"PRIu64" looking for %"PRIu64"\n", ctime(&tnow), l->lsn.lsn, ss->min_live_txn);
fprintf(stderr, "%.24s Tokudb recovery begin checkpoint at %"PRIu64" looking for %"PRIu64". Scanning backwards through %"PRIu64" log entries.\n", ctime(&tnow), l->lsn.lsn, ss->min_live_txn, l->lsn.lsn - ss->min_live_txn);
return 0;
}
case SS_BACKWARD_SAW_CKPT:
......@@ -645,7 +669,7 @@ static int toku_recover_fassociate (struct logtype_fassociate *UU(l), RECOVER_EN
static int toku_recover_backward_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) {
char *fixedfname = fixup_fname(&l->iname);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0);
return internal_toku_recover_fopen_or_fcreate(renv, 0, 0, fixedfname, l->filenum, l->treeflags, 0, NULL, 0, NULL);
}
static int toku_recover_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) {
......@@ -682,9 +706,12 @@ static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
if (l->parenttxnid != 0) {
r = toku_txnid2txn(renv->logger, l->parenttxnid, &parent);
assert(r == 0);
if (parent == NULL)
if (parent == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == SS_BACKWARD_SAW_CKPT); //cannot happen after checkpoint begin
return 0;
}
}
// create a transaction and bind it to the transaction id
TOKUTXN txn = NULL;
......@@ -702,7 +729,7 @@ static int toku_recover_backward_xbegin (struct logtype_xbegin *l, RECOVER_ENV r
return 0; // ignore txns that began during the checkpoint
case SS_BACKWARD_SAW_CKPT:
assert(ss->n_live_txns > 0); // the only thing we are doing here is looking for a live txn, so there better be one
// If we got to the min, return nonzero
assert(ss->min_live_txn <= l->lsn.lsn); //Did not pass it.
if (ss->min_live_txn >= l->lsn.lsn) {
if (tokudb_recovery_trace)
fprintf(stderr, "Tokudb recovery turning around at xbegin %" PRIu64 " (%s)\n", l->lsn.lsn, recover_state(renv));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment