Commit c43970f3 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

panic brt's if recovery fails close[t:2035]

git-svn-id: file:///svn/toku/tokudb@14783 c7de825b-a66e-492c-adef-691d508d4ae1
parent de8eff2b
...@@ -3249,9 +3249,11 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error ...@@ -3249,9 +3249,11 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error
assert(list_empty(&h->live_brts)); assert(list_empty(&h->live_brts));
assert(list_empty(&h->zombie_brts)); assert(list_empty(&h->zombie_brts));
toku_brtheader_unlock(h); toku_brtheader_unlock(h);
LSN lsn = ZERO_LSN;
int r = 0; int r = 0;
if (h->fname) { //Otherwise header has never fully been created. if (h->panic) {
r = h->panic;
} else if (h->fname) { //Otherwise header has never fully been created.
LSN lsn = ZERO_LSN;
//Get LSN //Get LSN
if (oplsn_valid) { if (oplsn_valid) {
//Use recovery-specified lsn //Use recovery-specified lsn
...@@ -3285,8 +3287,8 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error ...@@ -3285,8 +3287,8 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error
} }
} }
if (malloced_error_string) *malloced_error_string = h->panic_string; if (malloced_error_string) *malloced_error_string = h->panic_string;
if (r==0) { if (r == 0) {
r=h->panic; r = h->panic;
} }
toku_brtheader_free(h); toku_brtheader_free(h);
return r; return r;
...@@ -4991,6 +4993,20 @@ LSN toku_brt_checkpoint_lsn(BRT brt) { ...@@ -4991,6 +4993,20 @@ LSN toku_brt_checkpoint_lsn(BRT brt) {
return brt->h->checkpoint_lsn; return brt->h->checkpoint_lsn;
} }
static int toku_brt_header_set_panic(struct brt_header *h, int panic, char *panic_string) {
if (h->panic == 0) {
h->panic = panic;
if (h->panic_string)
toku_free(h->panic_string);
h->panic_string = toku_strdup(panic_string);
}
return 0;
}
int toku_brt_set_panic(BRT brt, int panic, char *panic_string) {
return toku_brt_header_set_panic(brt->h, panic, panic_string);
}
//Wrapper functions for upgrading from version 10. //Wrapper functions for upgrading from version 10.
#include "backwards_10.h" #include "backwards_10.h"
void void
......
...@@ -84,6 +84,8 @@ int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t), ...@@ -84,6 +84,8 @@ int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t),
int toku_close_brt (BRT, TOKULOGGER, char **error_string); int toku_close_brt (BRT, TOKULOGGER, char **error_string);
int toku_close_brt_lsn (BRT brt, TOKULOGGER logger, char **error_string, BOOL oplsn_valid, LSN oplsn); int toku_close_brt_lsn (BRT brt, TOKULOGGER logger, char **error_string, BOOL oplsn_valid, LSN oplsn);
int toku_brt_set_panic(BRT brt, int panic, char *panic_string);
int toku_dump_brt (FILE *,BRT brt); int toku_dump_brt (FILE *,BRT brt);
void brt_fsync (BRT); /* fsync, but don't clear the caches. */ void brt_fsync (BRT); /* fsync, but don't clear the caches. */
......
...@@ -76,10 +76,21 @@ static void file_map_close_dictionaries(struct file_map *fmap, BOOL recovery_suc ...@@ -76,10 +76,21 @@ static void file_map_close_dictionaries(struct file_map *fmap, BOOL recovery_suc
assert(r == 0); assert(r == 0);
struct file_map_tuple *tuple = v; struct file_map_tuple *tuple = v;
assert(tuple->brt); assert(tuple->brt);
assert(recovery_succeeded); if (!recovery_succeeded) {
// don't update the brt on close
toku_brt_set_panic(tuple->brt, DB_RUNRECOVERY, "recovery failed");
}
//Logging is already back on. No need to pass LSN into close. //Logging is already back on. No need to pass LSN into close.
r = toku_close_brt(tuple->brt, 0, 0); char *error_string = NULL;
assert(r == 0); r = toku_close_brt(tuple->brt, NULL, &error_string);
if (!recovery_succeeded) {
printf("%s:%d %d %s\n", __FUNCTION__, __LINE__, r, error_string);
assert(r != 0);
} else
assert(r == 0);
if (error_string)
toku_free(error_string);
file_map_tuple_destroy(tuple); file_map_tuple_destroy(tuple);
toku_free(tuple); toku_free(tuple);
} }
......
// verify that DB_RUNRECOVERY is returned when there is a missing db file
#include <sys/stat.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN;
#define NAMEA "a.db"
const char *namea=NAMEA;
#define NAMEB "b.db"
const char *nameb=NAMEB;
// needed to get .bdb versions to compile
#ifndef DB_CLOSE_DONT_TRIM_LOG
#define DB_CLOSE_DONT_TRIM_LOG 0
#endif
static void run_test (void) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
DB *dba;
r = db_create(&dba, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dba->close(dba, 0); CKERR(r);
DB *dbb;
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->close(dbb, 0); CKERR(r);
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = db_create(&dba, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
abort();
}
static void run_recover (void) {
DB_ENV *env;
int r;
r = rename(ENVDIR "/" NAMEB, ENVDIR "/" NAMEB ".save" ); printf("r=%d error=%d\n", r, errno); CKERR(r);
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == DB_RUNRECOVERY);
r = rename(ENVDIR "/" NAMEB ".save" , ENVDIR "/" NAMEB); printf("r=%d error=%d\n", r, errno); CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
static void run_no_recover (void) {
DB_ENV *env;
int r;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags & ~DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
const char *cmd;
BOOL do_test=FALSE, do_recover=FALSE, do_recover_only=FALSE, do_no_recover = FALSE;
static void test_parse_args (int argc, char *argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--test")==0) {
do_test=TRUE;
} else if (strcmp(argv[0], "--recover") == 0) {
do_recover=TRUE;
} else if (strcmp(argv[0], "--recover-only") == 0) {
do_recover_only=TRUE;
} else if (strcmp(argv[0], "--no-recover") == 0) {
do_no_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--test | --recover } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int test_main (int argc, char *argv[]) {
test_parse_args(argc, argv);
if (do_test) {
run_test();
} else if (do_recover) {
run_recover();
} else if (do_recover_only) {
run_recover();
} else if (do_no_recover) {
run_no_recover();
}
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment