From 46ae1267648b84471d524949d7c70317b4dbeae9 Mon Sep 17 00:00:00 2001
From: Rich Prohaska <prohaska@tokutek.com>
Date: Tue, 16 Apr 2013 23:58:03 -0400
Subject: [PATCH] panic brt's if recovery fails close[t:2035]

git-svn-id: file:///svn/toku/tokudb@14783 c7de825b-a66e-492c-adef-691d508d4ae1
---
 newbrt/brt.c                         |  24 ++++-
 newbrt/brt.h                         |   2 +
 newbrt/recover.c                     |  17 +++-
 src/tests/recover-missing-dbfile-2.c | 131 +++++++++++++++++++++++++++
 4 files changed, 167 insertions(+), 7 deletions(-)
 create mode 100644 src/tests/recover-missing-dbfile-2.c

diff --git a/newbrt/brt.c b/newbrt/brt.c
index f59505e4168..2b63b9f47b6 100644
--- a/newbrt/brt.c
+++ b/newbrt/brt.c
@@ -3249,9 +3249,11 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error
     assert(list_empty(&h->live_brts));
     assert(list_empty(&h->zombie_brts));
     toku_brtheader_unlock(h);
-    LSN lsn = ZERO_LSN;
     int r = 0;
-    if (h->fname) { //Otherwise header has never fully been created.
+    if (h->panic) {
+        r = h->panic;
+    } else if (h->fname) { //Otherwise header has never fully been created.
+        LSN lsn = ZERO_LSN;
         //Get LSN
         if (oplsn_valid) {
             //Use recovery-specified lsn
@@ -3285,8 +3287,8 @@ toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error
         }
     }
     if (malloced_error_string) *malloced_error_string = h->panic_string;
-    if (r==0) {
-	r=h->panic;
+    if (r == 0) {
+	r = h->panic;
     }
     toku_brtheader_free(h);
     return r;
@@ -4991,6 +4993,20 @@ LSN toku_brt_checkpoint_lsn(BRT brt) {
     return brt->h->checkpoint_lsn;
 }
 
+static int toku_brt_header_set_panic(struct brt_header *h, int panic, char *panic_string) {
+    if (h->panic == 0) {
+        h->panic = panic;
+        if (h->panic_string) 
+            toku_free(h->panic_string);
+        h->panic_string = toku_strdup(panic_string);
+    }
+    return 0;
+}
+
+int toku_brt_set_panic(BRT brt, int panic, char *panic_string) {
+    return toku_brt_header_set_panic(brt->h, panic, panic_string);
+}
+
 //Wrapper functions for upgrading from version 10.
 #include "backwards_10.h"
 void
diff --git a/newbrt/brt.h b/newbrt/brt.h
index 967351f5421..3233194a883 100644
--- a/newbrt/brt.h
+++ b/newbrt/brt.h
@@ -84,6 +84,8 @@ int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t),
 int toku_close_brt (BRT, TOKULOGGER, char **error_string);
 int toku_close_brt_lsn (BRT brt, TOKULOGGER logger, char **error_string, BOOL oplsn_valid, LSN oplsn);
 
+int toku_brt_set_panic(BRT brt, int panic, char *panic_string);
+
 int toku_dump_brt (FILE *,BRT brt);
 
 void brt_fsync (BRT); /* fsync, but don't clear the caches. */
diff --git a/newbrt/recover.c b/newbrt/recover.c
index 9ec732fa26b..2db33f419a4 100644
--- a/newbrt/recover.c
+++ b/newbrt/recover.c
@@ -76,10 +76,21 @@ static void file_map_close_dictionaries(struct file_map *fmap, BOOL recovery_suc
         assert(r == 0);
         struct file_map_tuple *tuple = v;
 	assert(tuple->brt);
-        assert(recovery_succeeded);
+        if (!recovery_succeeded) {
+            // don't update the brt on close
+            toku_brt_set_panic(tuple->brt, DB_RUNRECOVERY, "recovery failed");
+        }
         //Logging is already back on.  No need to pass LSN into close.
-        r = toku_close_brt(tuple->brt, 0, 0);
-        assert(r == 0);
+        char *error_string = NULL;
+        r = toku_close_brt(tuple->brt, NULL, &error_string);
+        if (!recovery_succeeded) {
+            printf("%s:%d %d %s\n", __FUNCTION__, __LINE__, r, error_string);
+            assert(r != 0);
+        } else
+            assert(r == 0);
+        if (error_string)
+            toku_free(error_string);
+
         file_map_tuple_destroy(tuple);
         toku_free(tuple);
     }
diff --git a/src/tests/recover-missing-dbfile-2.c b/src/tests/recover-missing-dbfile-2.c
new file mode 100644
index 00000000000..6c3c1b54878
--- /dev/null
+++ b/src/tests/recover-missing-dbfile-2.c
@@ -0,0 +1,131 @@
+// verify that DB_RUNRECOVERY is returned when there is a missing db file
+
+#include <sys/stat.h>
+#include "test.h"
+
+const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN;
+
+#define NAMEA "a.db"
+const char *namea=NAMEA;
+#define NAMEB "b.db"
+const char *nameb=NAMEB;
+
+// needed to get .bdb versions to compile
+#ifndef DB_CLOSE_DONT_TRIM_LOG
+#define DB_CLOSE_DONT_TRIM_LOG 0
+#endif
+
+static void run_test (void) {
+    int r;
+    system("rm -rf " ENVDIR);
+    toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
+    DB_ENV *env;
+
+    r = db_env_create(&env, 0);                                                         CKERR(r);
+    r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO);                      CKERR(r);
+
+    DB *dba;
+    r = db_create(&dba, env, 0);                                                        CKERR(r);
+    r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666);    CKERR(r);
+    r = dba->close(dba, 0); CKERR(r);
+
+    DB *dbb;
+    r = db_create(&dbb, env, 0);                                                        CKERR(r);
+    r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666);    CKERR(r);
+    r = dbb->close(dbb, 0); CKERR(r);
+
+    r = env->txn_checkpoint(env, 0, 0, 0);                                              CKERR(r);
+
+    DB_TXN *txn;
+    r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
+
+    r = db_create(&dba, env, 0);                                                        CKERR(r);
+    r = dba->open(dba, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666);    CKERR(r);
+
+    r = db_create(&dbb, env, 0);                                                        CKERR(r);
+    r = dbb->open(dbb, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666);    CKERR(r);
+    
+    r = env->txn_checkpoint(env, 0, 0, 0);                                              CKERR(r);
+
+    r = txn->commit(txn, 0);                                                            CKERR(r);
+
+    abort();
+}
+
+static void run_recover (void) {
+    DB_ENV *env;
+    int r;
+
+    r = rename(ENVDIR "/" NAMEB, ENVDIR "/" NAMEB ".save" ); printf("r=%d error=%d\n", r, errno); CKERR(r);
+
+    r = db_env_create(&env, 0);                                                             CKERR(r);
+    r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO);
+    assert(r == DB_RUNRECOVERY);
+
+    r = rename(ENVDIR "/" NAMEB ".save" , ENVDIR "/" NAMEB); printf("r=%d error=%d\n", r, errno); CKERR(r);
+
+    r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO);             CKERR(r);
+    r = env->close(env, 0);                                                                 CKERR(r);
+    exit(0);
+}
+
+static void run_no_recover (void) {
+    DB_ENV *env;
+    int r;
+
+    r = db_env_create(&env, 0);                                                             CKERR(r);
+    r = env->open(env, ENVDIR, envflags & ~DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO);            CKERR(r);
+    r = env->close(env, 0);                                                                 CKERR(r);
+    exit(0);
+}
+
+const char *cmd;
+
+BOOL do_test=FALSE, do_recover=FALSE, do_recover_only=FALSE, do_no_recover = FALSE;
+
+static void test_parse_args (int argc, char *argv[]) {
+    int resultcode;
+    cmd = argv[0];
+    argc--; argv++;
+    while (argc>0) {
+	if (strcmp(argv[0], "-v") == 0) {
+	    verbose++;
+	} else if (strcmp(argv[0],"-q")==0) {
+	    verbose--;
+	    if (verbose<0) verbose=0;
+	} else if (strcmp(argv[0], "--test")==0) {
+	    do_test=TRUE;
+        } else if (strcmp(argv[0], "--recover") == 0) {
+            do_recover=TRUE;
+        } else if (strcmp(argv[0], "--recover-only") == 0) {
+            do_recover_only=TRUE;
+        } else if (strcmp(argv[0], "--no-recover") == 0) {
+            do_no_recover=TRUE;
+	} else if (strcmp(argv[0], "-h")==0) {
+	    resultcode=0;
+	do_usage:
+	    fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--test | --recover } \n", cmd);
+	    exit(resultcode);
+	} else {
+	    fprintf(stderr, "Unknown arg: %s\n", argv[0]);
+	    resultcode=1;
+	    goto do_usage;
+	}
+	argc--;
+	argv++;
+    }
+}
+
+int test_main (int argc, char *argv[]) {
+    test_parse_args(argc, argv);
+    if (do_test) {
+	run_test();
+    } else if (do_recover) {
+        run_recover();
+    } else if (do_recover_only) {
+        run_recover();
+    } else if (do_no_recover) {
+        run_no_recover();
+    } 
+    return 0;
+}
-- 
2.30.9