Commit 3147c2b9 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

capture the recover-staddle-txn.c test [t:2005]

git-svn-id: file:///svn/toku/tokudb@14485 c7de825b-a66e-492c-adef-691d508d4ae1
parent 25baff39
......@@ -30,65 +30,73 @@ static void backward_scan_state_init(struct backward_scan_state *bs) {
// Map filenum to brt
// TODO why can't we use the cachetable to find by filenum?
struct file_map {
struct cf_pair {
FILENUM filenum;
struct cf_tuple {
FILENUM filenum;
CACHEFILE cf;
BRT brt; // set to zero on an fopen, but filled in when an fheader is seen.
} *cf_pairs;
int n_cf_pairs, max_cf_pairs;
char *fname;
} *cf_tuples;
int n_cf_tuples, max_cf_tuples;
};
static void file_map_init(struct file_map *fmap) {
fmap->cf_pairs = NULL;
fmap->n_cf_pairs = fmap->max_cf_pairs = 0;
fmap->cf_tuples = NULL;
fmap->n_cf_tuples = fmap->max_cf_tuples = 0;
}
static void file_map_close_dictionaries(struct file_map *fmap) {
int r;
for (int i=0; i<fmap->n_cf_pairs; i++) {
if (fmap->cf_pairs[i].brt) {
r = toku_close_brt(fmap->cf_pairs[i].brt, 0, 0);
//r = toku_cachefile_close(&cf_pairs[i].cf);
for (int i=0; i<fmap->n_cf_tuples; i++) {
struct cf_tuple *tuple = &fmap->cf_tuples[i];
if (tuple->brt) {
r = toku_close_brt(tuple->brt, 0, 0);
//r = toku_cachefile_close(&cf_tuples[i].cf);
assert(r == 0);
}
if (tuple->fname) {
toku_free(tuple->fname);
tuple->fname = NULL;
}
}
fmap->n_cf_pairs = fmap->max_cf_pairs = 0;
if (fmap->cf_pairs) {
toku_free(fmap->cf_pairs);
fmap->cf_pairs = NULL;
fmap->n_cf_tuples = fmap->max_cf_tuples = 0;
if (fmap->cf_tuples) {
toku_free(fmap->cf_tuples);
fmap->cf_tuples = NULL;
}
}
static int file_map_add (struct file_map *fmap, FILENUM fnum, CACHEFILE cf, BRT brt) {
if (fmap->cf_pairs == NULL) {
fmap->max_cf_pairs = 1;
MALLOC_N(fmap->max_cf_pairs, fmap->cf_pairs);
if (fmap->cf_pairs == NULL) {
fmap->max_cf_pairs = 0;
static int file_map_add (struct file_map *fmap, FILENUM fnum, CACHEFILE cf, BRT brt, char *fname) {
if (fmap->cf_tuples == NULL) {
fmap->max_cf_tuples = 1;
MALLOC_N(fmap->max_cf_tuples, fmap->cf_tuples);
if (fmap->cf_tuples == NULL) {
fmap->max_cf_tuples = 0;
return errno;
}
fmap->n_cf_pairs=1;
fmap->n_cf_tuples=1;
} else {
if (fmap->n_cf_pairs >= fmap->max_cf_pairs) {
struct cf_pair *new_cf_pairs = toku_realloc(fmap->cf_pairs, 2*fmap->max_cf_pairs*sizeof(struct cf_pair));
if (new_cf_pairs == NULL)
if (fmap->n_cf_tuples >= fmap->max_cf_tuples) {
struct cf_tuple *new_cf_tuples = toku_realloc(fmap->cf_tuples, 2*fmap->max_cf_tuples*sizeof(struct cf_tuple));
if (new_cf_tuples == NULL)
return errno;
fmap->cf_pairs = new_cf_pairs;
fmap->max_cf_pairs *= 2;
fmap->cf_tuples = new_cf_tuples;
fmap->max_cf_tuples *= 2;
}
fmap->n_cf_pairs++;
fmap->n_cf_tuples++;
}
fmap->cf_pairs[fmap->n_cf_pairs-1].filenum = fnum;
fmap->cf_pairs[fmap->n_cf_pairs-1].cf = cf;
fmap->cf_pairs[fmap->n_cf_pairs-1].brt = brt;
struct cf_tuple *tuple = &fmap->cf_tuples[fmap->n_cf_tuples-1];
tuple->filenum = fnum;
tuple->cf = cf;
tuple->brt = brt;
tuple->fname = fname;
return 0;
}
static int find_cachefile (struct file_map *fmap, FILENUM fnum, struct cf_pair **cf_pair) {
for (int i=0; i<fmap->n_cf_pairs; i++) {
if (fnum.fileid==fmap->cf_pairs[i].filenum.fileid) {
*cf_pair = fmap->cf_pairs+i;
static int find_cachefile (struct file_map *fmap, FILENUM fnum, struct cf_tuple **cf_tuple) {
for (int i=0; i<fmap->n_cf_tuples; i++) {
if (fnum.fileid==fmap->cf_tuples[i].filenum.fileid) {
*cf_tuple = &fmap->cf_tuples[i];
return 0;
}
}
......@@ -147,9 +155,9 @@ static void toku_recover_commit (LSN lsn, TXNID xid, RECOVER_ENV renv) {
int r;
// find the transaction by transaction id
TOKUTXN txn;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
assert(r == 0 && txn);
// commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, lsn);
......@@ -168,9 +176,9 @@ static void toku_recover_xabort (LSN lsn, TXNID xid, RECOVER_ENV renv) {
int r;
// find the transaction by transaction id
TOKUTXN txn;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
assert(r == 0 && txn);
// abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, lsn);
......@@ -215,9 +223,10 @@ static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
int r;
// already open
struct cf_pair *pair = NULL;
r = find_cachefile(&renv->fmap, filenum, &pair);
struct cf_tuple *tuple = NULL;
r = find_cachefile(&renv->fmap, filenum, &tuple);
if (r == 0) {
assert(strcmp(tuple->fname, fixedfname) == 0);
toku_free(fixedfname);
return;
}
......@@ -252,9 +261,7 @@ static void internal_toku_recover_fopen_or_fcreate (RECOVER_ENV renv, int flags,
r = toku_brt_open(brt, fixedfname, fixedfname, (flags & O_CREAT) != 0, FALSE, renv->ct, NULL, NULL);
assert(r == 0);
toku_free(fixedfname);
file_map_add(&renv->fmap, filenum, NULL, brt);
file_map_add(&renv->fmap, filenum, NULL, brt, fixedfname);
}
static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FILENUM filenum, RECOVER_ENV renv) {
......@@ -265,12 +272,12 @@ static void toku_recover_fopen (LSN UU(lsn), TXNID UU(xid), BYTESTRING fname, FI
static int toku_recover_backward_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
if (renv->bs.bs == BS_SAW_CKPT_END) {
// close the tree
struct cf_pair *pair = NULL;
int r = find_cachefile(&renv->fmap, l->filenum, &pair);
struct cf_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, l->filenum, &tuple);
if (r == 0) {
r = toku_close_brt(pair->brt, 0, 0);
r = toku_close_brt(tuple->brt, 0, 0);
assert(r == 0);
pair->brt=0;
tuple->brt=0;
}
}
return 0;
......@@ -289,20 +296,20 @@ static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER
}
static void toku_recover_enq_insert (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV renv) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&renv->fmap, filenum, &pair);
struct cf_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, filenum, &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
TOKUTXN txn;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
assert(r == 0 && txn);
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len);
toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_maybe_insert(pair->brt, &keydbt, &valdbt, txn, lsn);
r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, lsn);
assert(r == 0);
}
......@@ -312,20 +319,20 @@ static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), R
}
static void toku_recover_enq_delete_both (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV renv) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&renv->fmap, filenum, &pair);
struct cf_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, filenum, &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
TOKUTXN txn;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
assert(r == 0 && txn);
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len);
toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_maybe_delete_both(pair->brt, &keydbt, &valdbt, txn, lsn);
r = toku_brt_maybe_delete_both(tuple->brt, &keydbt, &valdbt, txn, lsn);
assert(r == 0);
}
......@@ -335,19 +342,19 @@ static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both
}
static void toku_recover_enq_delete_any (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, RECOVER_ENV renv) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&renv->fmap, filenum, &pair);
struct cf_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, filenum, &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
TOKUTXN txn;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, xid, &txn);
assert(r == 0);
assert(r == 0 && txn);
DBT keydbt;
toku_fill_dbt(&keydbt, key.data, key.len);
r = toku_brt_maybe_delete(pair->brt, &keydbt, txn, lsn);
r = toku_brt_maybe_delete(tuple->brt, &keydbt, txn, lsn);
assert(r == 0);
}
......@@ -356,13 +363,16 @@ static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *
return 0;
}
static void toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum, RECOVER_ENV UU(renv)) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&renv->fmap, filenum, &pair);
static void toku_recover_fclose (LSN UU(lsn), BYTESTRING fname, FILENUM filenum, RECOVER_ENV UU(renv)) {
struct cf_tuple *tuple = NULL;
int r = find_cachefile(&renv->fmap, filenum, &tuple);
if (r == 0) {
r = toku_close_brt(pair->brt, 0, 0);
char *fixedfname = fixup_fname(&fname);
assert(strcmp(tuple->fname, fixedfname) == 0);
toku_free(fixedfname);
r = toku_close_brt(tuple->brt, 0, 0);
assert(r == 0);
pair->brt=0;
tuple->brt=0;
}
}
......@@ -462,8 +472,10 @@ static int toku_recover_xbegin (LSN lsn, TXNID parent_xid, RECOVER_ENV renv) {
// lookup the parent
TOKUTXN parent = NULL;
r = toku_txnid2txn(renv->logger, parent_xid, &parent);
assert(r == 0);
if (parent_xid != 0) {
r = toku_txnid2txn(renv->logger, parent_xid, &parent);
assert(r == 0 && parent);
}
// create a transaction and bind it to the transaction id
TOKUTXN txn = NULL;
......
// TODO
#include <sys/stat.h>
#include <sys/wait.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN;
char *namea="a.db";
static void run_test (void) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
DB_TXN *atxn;
r = env->txn_begin(env, NULL, &atxn, 0); CKERR(r);
DB_TXN *btxn;
r = env->txn_begin(env, atxn, &btxn, 0); CKERR(r);
// create a live transaction so that recovery needs to come here
DB_TXN *livetxn;
r = env->txn_begin(env, NULL, &livetxn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2};
r = db->put(db, btxn, &k, &v, DB_YESOVERWRITE); CKERR(r);
r = btxn->commit(btxn, 0); CKERR(r);
r = atxn->abort(atxn); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
// abort the process
abort();
}
static void run_recover (void) {
DB_ENV *env;
int r;
// run recovery
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
// verify the data
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); CKERR(r);
DBT k, v;
r = cursor->c_get(cursor, dbt_init_malloc(&k), dbt_init_malloc(&v), DB_FIRST);
assert(r == DB_NOTFOUND);
r = cursor->c_close(cursor); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
static void run_no_recover (void) {
DB_ENV *env;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags & ~DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
const char *cmd;
BOOL do_test=FALSE, do_recover=FALSE, do_recover_only=FALSE, do_no_recover = FALSE;
static void test_parse_args (int argc, char *argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--test")==0) {
do_test=TRUE;
} else if (strcmp(argv[0], "--recover") == 0) {
do_recover=TRUE;
} else if (strcmp(argv[0], "--recover-only") == 0) {
do_recover_only=TRUE;
} else if (strcmp(argv[0], "--no-recover") == 0) {
do_no_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--test | --recover } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int test_main (int argc, char *argv[]) {
test_parse_args(argc, argv);
if (do_test) {
run_test();
} else if (do_recover) {
run_recover();
} else if (do_recover_only) {
run_recover();
} else if (do_no_recover) {
run_no_recover();
}
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment