Commit 4042de51 authored by Barry Perlman's avatar Barry Perlman Committed by Yoni Fogel

[t:3113] During recovery, do not use latest version of rollback log file,

but use latest version that was checkpointed at or before the last complete checkpoint.

git-svn-id: file:///svn/toku/tokudb@27338 c7de825b-a66e-492c-adef-691d508d4ae1
parent 405d6513
...@@ -251,7 +251,7 @@ void toku_verify_or_set_counts(BRTNODE, BOOL); ...@@ -251,7 +251,7 @@ void toku_verify_or_set_counts(BRTNODE, BOOL);
int toku_serialize_brt_header_size (struct brt_header *h); int toku_serialize_brt_header_size (struct brt_header *h);
int toku_serialize_brt_header_to (int fd, struct brt_header *h); int toku_serialize_brt_header_to (int fd, struct brt_header *h);
int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h, int64_t address_translation, int64_t size_translation); int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h, int64_t address_translation, int64_t size_translation);
int toku_deserialize_brtheader_from (int fd, struct brt_header **brth); int toku_deserialize_brtheader_from (int fd, LSN required_lsn, struct brt_header **brth);
int toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF offset); int toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF offset);
void toku_serialize_descriptor_contents_to_wbuf(struct wbuf *wb, const DESCRIPTOR desc); void toku_serialize_descriptor_contents_to_wbuf(struct wbuf *wb, const DESCRIPTOR desc);
...@@ -272,7 +272,7 @@ struct brtenv { ...@@ -272,7 +272,7 @@ struct brtenv {
extern void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint); extern void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint);
extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, int*dirty, void*extraargs); extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, int*dirty, void*extraargs);
extern int toku_brt_alloc_init_header(BRT t, TOKUTXN txn); extern int toku_brt_alloc_init_header(BRT t, TOKUTXN txn);
extern int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header **header, BOOL* was_open); extern int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, LSN required_lsn, struct brt_header **header, BOOL* was_open);
extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *root_hash); extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *root_hash);
static const BRTNODE null_brtnode=0; static const BRTNODE null_brtnode=0;
......
...@@ -1656,7 +1656,8 @@ deserialize_brtheader_versioned (int fd, struct rbuf *rb, struct brt_header **br ...@@ -1656,7 +1656,8 @@ deserialize_brtheader_versioned (int fd, struct rbuf *rb, struct brt_header **br
// If that ever changes, then modify this. // If that ever changes, then modify this.
//TOKUDB_DICTIONARY_NO_HEADER means we can overwrite everything in the file AND the header is useless //TOKUDB_DICTIONARY_NO_HEADER means we can overwrite everything in the file AND the header is useless
static int static int
deserialize_brtheader_from_fd_into_rbuf(int fd, toku_off_t offset_of_header, struct rbuf *rb, u_int64_t *checkpoint_count, u_int32_t * version_p) { deserialize_brtheader_from_fd_into_rbuf(int fd, toku_off_t offset_of_header, struct rbuf *rb,
u_int64_t *checkpoint_count, LSN *checkpoint_lsn, u_int32_t * version_p) {
int r = 0; int r = 0;
const int64_t prefix_size = 8 + // magic ("tokudata") const int64_t prefix_size = 8 + // magic ("tokudata")
4 + // version 4 + // version
...@@ -1734,6 +1735,7 @@ deserialize_brtheader_from_fd_into_rbuf(int fd, toku_off_t offset_of_header, str ...@@ -1734,6 +1735,7 @@ deserialize_brtheader_from_fd_into_rbuf(int fd, toku_off_t offset_of_header, str
if (r==0) { if (r==0) {
//Load checkpoint count //Load checkpoint count
*checkpoint_count = rbuf_ulonglong(rb); *checkpoint_count = rbuf_ulonglong(rb);
*checkpoint_lsn = rbuf_lsn(rb);
//Restart at beginning during regular deserialization //Restart at beginning during regular deserialization
rb->ndone = 0; rb->ndone = 0;
} }
...@@ -1746,24 +1748,32 @@ deserialize_brtheader_from_fd_into_rbuf(int fd, toku_off_t offset_of_header, str ...@@ -1746,24 +1748,32 @@ deserialize_brtheader_from_fd_into_rbuf(int fd, toku_off_t offset_of_header, str
} }
// Read brtheader from file into struct. Read both headers and use the latest one. // Read brtheader from file into struct. Read both headers and use one.
// If a required_lsn is specified, then use the header with that lsn,
// or if a header with that lsn is not available, then use the latest
// header with an earlier lsn than required_lsn. (There may not be a header
// with required_lsn if the file did not change since the last checkpoint.)
// If a required_lsn is not specified, then use the latest header.
int int
toku_deserialize_brtheader_from (int fd, struct brt_header **brth) { toku_deserialize_brtheader_from (int fd, LSN required_lsn, struct brt_header **brth) {
struct rbuf rb_0; struct rbuf rb_0;
struct rbuf rb_1; struct rbuf rb_1;
u_int64_t checkpoint_count_0; u_int64_t checkpoint_count_0;
u_int64_t checkpoint_count_1; u_int64_t checkpoint_count_1;
LSN checkpoint_lsn_0;
LSN checkpoint_lsn_1;
int r = 0;
int r0; int r0;
int r1; int r1;
u_int32_t version_0, version_1, version = 0; u_int32_t version_0, version_1, version = 0;
{ {
toku_off_t header_0_off = 0; toku_off_t header_0_off = 0;
r0 = deserialize_brtheader_from_fd_into_rbuf(fd, header_0_off, &rb_0, &checkpoint_count_0, &version_0); r0 = deserialize_brtheader_from_fd_into_rbuf(fd, header_0_off, &rb_0, &checkpoint_count_0, &checkpoint_lsn_0, &version_0);
} }
{ {
toku_off_t header_1_off = BLOCK_ALLOCATOR_HEADER_RESERVE; toku_off_t header_1_off = BLOCK_ALLOCATOR_HEADER_RESERVE;
r1 = deserialize_brtheader_from_fd_into_rbuf(fd, header_1_off, &rb_1, &checkpoint_count_1, &version_1); r1 = deserialize_brtheader_from_fd_into_rbuf(fd, header_1_off, &rb_1, &checkpoint_count_1, &checkpoint_lsn_1, &version_1);
} }
struct rbuf *rb = NULL; struct rbuf *rb = NULL;
...@@ -1782,8 +1792,35 @@ toku_deserialize_brtheader_from (int fd, struct brt_header **brth) { ...@@ -1782,8 +1792,35 @@ toku_deserialize_brtheader_from (int fd, struct brt_header **brth) {
else lazy_assert(version_0 <= version_1); else lazy_assert(version_0 <= version_1);
} }
} }
int r = 0;
if (rb==NULL) { if (required_lsn.lsn != ZERO_LSN.lsn) { // an upper bound lsn was requested
if ((r0==0 && checkpoint_lsn_0.lsn <= required_lsn.lsn) &&
(r1 || checkpoint_lsn_1.lsn > required_lsn.lsn)) {
rb = &rb_0;
version = version_0;
}
else if ((r1==0 && checkpoint_lsn_1.lsn <= required_lsn.lsn) &&
(r0 || checkpoint_lsn_0.lsn > required_lsn.lsn)) {
rb = &rb_1;
version = version_1;
}
else if (r0==0 && r1==0) { // read two good headers and both have qualified lsn
if (checkpoint_lsn_0.lsn > checkpoint_lsn_1.lsn) {
rb = &rb_0;
version = version_0;
}
else {
rb = &rb_1;
version = version_1;
}
}
else {
rb = NULL;
r = -1; // may need new error code for unable to get required version of file
}
}
if (rb==NULL && r==0) {
// We were unable to read either header or at least one is too new. // We were unable to read either header or at least one is too new.
// Certain errors are higher priority than others. Order of these if/else if is important. // Certain errors are higher priority than others. Order of these if/else if is important.
if (r0==TOKUDB_DICTIONARY_TOO_NEW || r1==TOKUDB_DICTIONARY_TOO_NEW) if (r0==TOKUDB_DICTIONARY_TOO_NEW || r1==TOKUDB_DICTIONARY_TOO_NEW)
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
BOOL ignore_if_was_already_open; BOOL ignore_if_was_already_open;
int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) { int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) {
BRTNODE node; BRTNODE node;
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h, &ignore_if_was_already_open); int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, ZERO_LSN, &brt->h, &ignore_if_was_already_open);
if (r!=0) return r; if (r!=0) return r;
toku_create_new_brtnode(brt, &node, 0, 0); toku_create_new_brtnode(brt, &node, 0, 0);
...@@ -22,7 +22,7 @@ int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) { ...@@ -22,7 +22,7 @@ int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) {
int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_children, BLOCKNUM *children, u_int32_t *subtree_fingerprints, char **keys, int *keylens) { int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_children, BLOCKNUM *children, u_int32_t *subtree_fingerprints, char **keys, int *keylens) {
BRTNODE node; BRTNODE node;
assert(n_children<=BRT_FANOUT); assert(n_children<=BRT_FANOUT);
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h, &ignore_if_was_already_open); int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, ZERO_LSN, &brt->h, &ignore_if_was_already_open);
if (r!=0) return r; if (r!=0) return r;
toku_create_new_brtnode(brt, &node, height, 0); toku_create_new_brtnode(brt, &node, height, 0);
node->u.n.n_children=n_children; node->u.n.n_children=n_children;
...@@ -47,7 +47,7 @@ int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_child ...@@ -47,7 +47,7 @@ int toku_testsetup_nonleaf (BRT brt, int height, BLOCKNUM *blocknum, int n_child
} }
int toku_testsetup_root(BRT brt, BLOCKNUM blocknum) { int toku_testsetup_root(BRT brt, BLOCKNUM blocknum) {
int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h, &ignore_if_was_already_open); int r = toku_read_brt_header_and_store_in_cachefile(brt->cf, ZERO_LSN, &brt->h, &ignore_if_was_already_open);
if (r!=0) return r; if (r!=0) return r;
brt->h->root = blocknum; brt->h->root = blocknum;
brt->h->root_hash.valid = FALSE; brt->h->root_hash.valid = FALSE;
......
...@@ -3156,9 +3156,10 @@ int toku_brt_alloc_init_header(BRT t, TOKUTXN txn) { ...@@ -3156,9 +3156,10 @@ int toku_brt_alloc_init_header(BRT t, TOKUTXN txn) {
return r; return r;
} }
int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header **header, BOOL* was_open) int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, LSN required_lsn, struct brt_header **header, BOOL* was_open)
// If the cachefile already has the header, then just get it. // If the cachefile already has the header, then just get it.
// If the cachefile has not been initialized, then don't modify anything. // If the cachefile has not been initialized, then don't modify anything.
// required_lsn is used if a particular version of the file is required based on checkpoint_lsn
{ {
{ {
struct brt_header *h; struct brt_header *h;
...@@ -3173,7 +3174,7 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header ...@@ -3173,7 +3174,7 @@ int toku_read_brt_header_and_store_in_cachefile (CACHEFILE cf, struct brt_header
int r; int r;
{ {
int fd = toku_cachefile_get_and_pin_fd (cf); int fd = toku_cachefile_get_and_pin_fd (cf);
r = toku_deserialize_brtheader_from(fd, &h); r = toku_deserialize_brtheader_from(fd, required_lsn, &h);
toku_cachefile_unpin_fd(cf); toku_cachefile_unpin_fd(cf);
} }
if (r!=0) return r; if (r!=0) return r;
...@@ -3273,8 +3274,9 @@ cleanup: ...@@ -3273,8 +3274,9 @@ cleanup:
// This is the actual open, used for various purposes, such as normal use, recovery, and redirect. // This is the actual open, used for various purposes, such as normal use, recovery, and redirect.
// fname_in_env is the iname, relative to the env_dir (data_dir is already in iname as prefix) // fname_in_env is the iname, relative to the env_dir (data_dir is already in iname as prefix)
// If required_lsn is specified (not ZERO_LSN), then that specific checkpointed version is required.
static int static int
brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id) { brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id, LSN required_lsn) {
int r; int r;
BOOL txn_created = FALSE; BOOL txn_created = FALSE;
...@@ -3340,7 +3342,7 @@ brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHET ...@@ -3340,7 +3342,7 @@ brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHET
} }
BOOL was_already_open; BOOL was_already_open;
if (is_create) { if (is_create) {
r = toku_read_brt_header_and_store_in_cachefile(t->cf, &t->h, &was_already_open); r = toku_read_brt_header_and_store_in_cachefile(t->cf, required_lsn, &t->h, &was_already_open);
if (r==TOKUDB_DICTIONARY_NO_HEADER) { if (r==TOKUDB_DICTIONARY_NO_HEADER) {
r = toku_brt_alloc_init_header(t, txn); r = toku_brt_alloc_init_header(t, txn);
if (r != 0) goto died_after_read_and_pin; if (r != 0) goto died_after_read_and_pin;
...@@ -3355,7 +3357,7 @@ brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHET ...@@ -3355,7 +3357,7 @@ brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHET
} }
else goto found_it; else goto found_it;
} else { } else {
if ((r = toku_read_brt_header_and_store_in_cachefile(t->cf, &t->h, &was_already_open))!=0) goto died_after_open; if ((r = toku_read_brt_header_and_store_in_cachefile(t->cf, required_lsn, &t->h, &was_already_open))!=0) goto died_after_open;
found_it: found_it:
t->nodesize = t->h->nodesize; /* inherit the pagesize from the file */ t->nodesize = t->h->nodesize; /* inherit the pagesize from the file */
if (!t->did_set_flags) { if (!t->did_set_flags) {
...@@ -3426,13 +3428,16 @@ brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHET ...@@ -3426,13 +3428,16 @@ brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHET
return 0; return 0;
} }
// Open a brt for the purpose of recovery, which requires that the brt be open to a pre-determined FILENUM. (dict_id is assigned by the brt_open() function.) // Open a brt for the purpose of recovery, which requires that the brt be open to a pre-determined FILENUM
// and may require a specific checkpointed version of the file.
// (dict_id is assigned by the brt_open() function.)
int int
toku_brt_open_recovery(BRT t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db, FILENUM use_filenum) { toku_brt_open_recovery(BRT t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn,
DB *db, FILENUM use_filenum, LSN required_lsn) {
int r; int r;
lazy_assert(use_filenum.fileid != FILENUM_NONE.fileid); lazy_assert(use_filenum.fileid != FILENUM_NONE.fileid);
r = brt_open(t, fname_in_env, is_create, only_create, cachetable, r = brt_open(t, fname_in_env, is_create, only_create, cachetable,
txn, db, use_filenum, DICTIONARY_ID_NONE); txn, db, use_filenum, DICTIONARY_ID_NONE, required_lsn);
return r; return r;
} }
...@@ -3440,7 +3445,7 @@ toku_brt_open_recovery(BRT t, const char *fname_in_env, int is_create, int only_ ...@@ -3440,7 +3445,7 @@ toku_brt_open_recovery(BRT t, const char *fname_in_env, int is_create, int only_
int int
toku_brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db) { toku_brt_open(BRT t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, DB *db) {
int r; int r;
r = brt_open(t, fname_in_env, is_create, only_create, cachetable, txn, db, FILENUM_NONE, DICTIONARY_ID_NONE); r = brt_open(t, fname_in_env, is_create, only_create, cachetable, txn, db, FILENUM_NONE, DICTIONARY_ID_NONE, ZERO_LSN);
return r; return r;
} }
...@@ -3462,7 +3467,7 @@ brt_open_for_redirect(BRT *new_brtp, const char *fname_in_env, TOKUTXN txn, BRT ...@@ -3462,7 +3467,7 @@ brt_open_for_redirect(BRT *new_brtp, const char *fname_in_env, TOKUTXN txn, BRT
lazy_assert_zero(r); lazy_assert_zero(r);
} }
CACHETABLE ct = toku_cachefile_get_cachetable(old_brt->cf); CACHETABLE ct = toku_cachefile_get_cachetable(old_brt->cf);
r = brt_open(t, fname_in_env, 0, 0, ct, txn, old_brt->db, FILENUM_NONE, old_h->dict_id); r = brt_open(t, fname_in_env, 0, 0, ct, txn, old_brt->db, FILENUM_NONE, old_h->dict_id, ZERO_LSN);
lazy_assert_zero(r); lazy_assert_zero(r);
if (old_h->descriptor.version==0) { if (old_h->descriptor.version==0) {
lazy_assert(t->h->descriptor.version == 0); lazy_assert(t->h->descriptor.version == 0);
......
...@@ -54,8 +54,8 @@ brt_compare_func toku_brt_get_bt_compare (BRT brt); ...@@ -54,8 +54,8 @@ brt_compare_func toku_brt_get_bt_compare (BRT brt);
int brt_set_cachetable(BRT, CACHETABLE); int brt_set_cachetable(BRT, CACHETABLE);
int toku_brt_open(BRT, const char *fname_in_env, int toku_brt_open(BRT, const char *fname_in_env,
int is_create, int only_create, CACHETABLE ct, TOKUTXN txn, DB *db) __attribute__ ((warn_unused_result)); int is_create, int only_create, CACHETABLE ct, TOKUTXN txn, DB *db) __attribute__ ((warn_unused_result));
int toku_brt_open_recovery(BRT, const char *fname_in_env, int toku_brt_open_recovery(BRT, const char *fname_in_env, int is_create, int only_create, CACHETABLE ct, TOKUTXN txn,
int is_create, int only_create, CACHETABLE ct, TOKUTXN txn, DB *db, FILENUM use_filenum) __attribute__ ((warn_unused_result)); DB *db, FILENUM use_filenum, LSN required_lsn) __attribute__ ((warn_unused_result));
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) __attribute__ ((warn_unused_result)); int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) __attribute__ ((warn_unused_result));
......
...@@ -76,7 +76,8 @@ static void ...@@ -76,7 +76,8 @@ static void
dump_header (int f, struct brt_header **header, CACHEFILE cf) { dump_header (int f, struct brt_header **header, CACHEFILE cf) {
struct brt_header *h; struct brt_header *h;
int r; int r;
r = toku_deserialize_brtheader_from (f, &h); assert(r==0); r = toku_deserialize_brtheader_from (f, ZERO_LSN, &h);
assert(r==0);
h->cf = cf; h->cf = cf;
printf("brtheader:\n"); printf("brtheader:\n");
printf(" layout_version=%d\n", h->layout_version); printf(" layout_version=%d\n", h->layout_version);
......
...@@ -28,6 +28,7 @@ struct scan_state { ...@@ -28,6 +28,7 @@ struct scan_state {
FORWARD_BETWEEN_CHECKPOINT_BEGIN_END, FORWARD_BETWEEN_CHECKPOINT_BEGIN_END,
FORWARD_NEWER_CHECKPOINT_END, FORWARD_NEWER_CHECKPOINT_END,
} ss; } ss;
LSN incomplete_checkpoint_begin_lsn; // lsn of checkpoint_begin that did not complete
LSN checkpoint_begin_lsn; LSN checkpoint_begin_lsn;
LSN checkpoint_end_lsn; LSN checkpoint_end_lsn;
uint64_t checkpoint_end_timestamp; uint64_t checkpoint_end_timestamp;
...@@ -238,7 +239,7 @@ static void recover_yield(voidfp f, void *fpthunk, void *UU(yieldthunk)) { ...@@ -238,7 +239,7 @@ static void recover_yield(voidfp f, void *fpthunk, void *UU(yieldthunk)) {
// Open the file if it is not already open. If it is already open, then do nothing. // Open the file if it is not already open. If it is already open, then do nothing.
static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create, int mode, BYTESTRING *bs_iname, FILENUM filenum, u_int32_t treeflags, static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create, int mode, BYTESTRING *bs_iname, FILENUM filenum, u_int32_t treeflags,
u_int32_t descriptor_version, BYTESTRING* descriptor, TOKUTXN txn, uint32_t nodesize) { u_int32_t descriptor_version, BYTESTRING* descriptor, TOKUTXN txn, uint32_t nodesize, LSN required_lsn) {
int r; int r;
char *iname = fixup_fname(bs_iname); char *iname = fixup_fname(bs_iname);
...@@ -271,7 +272,7 @@ static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create ...@@ -271,7 +272,7 @@ static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create
r = toku_brt_set_descriptor(brt, descriptor_version, &descriptor_dbt); r = toku_brt_set_descriptor(brt, descriptor_version, &descriptor_dbt);
if (r != 0) goto close_brt; if (r != 0) goto close_brt;
} }
r = toku_brt_open_recovery(brt, iname, must_create, must_create, renv->ct, txn, fake_db, filenum); r = toku_brt_open_recovery(brt, iname, must_create, must_create, renv->ct, txn, fake_db, filenum, required_lsn);
if (r != 0) { if (r != 0) {
close_brt: close_brt:
; ;
...@@ -338,7 +339,9 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi ...@@ -338,7 +339,9 @@ static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoi
fprintf(stderr, "%.24s Tokudb recovery bw_begin_checkpoint at %"PRIu64" timestamp %"PRIu64" (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, recover_state(renv)); fprintf(stderr, "%.24s Tokudb recovery bw_begin_checkpoint at %"PRIu64" timestamp %"PRIu64" (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, recover_state(renv));
switch (renv->ss.ss) { switch (renv->ss.ss) {
case BACKWARD_NEWER_CHECKPOINT_END: case BACKWARD_NEWER_CHECKPOINT_END:
r = 0; // incomplete checkpoint. Nothing to do. // incomplete checkpoint, useful for accountability
renv->ss.incomplete_checkpoint_begin_lsn = l->lsn;
r = 0;
break; break;
case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END: case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn); assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
...@@ -409,13 +412,20 @@ static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV re ...@@ -409,13 +412,20 @@ static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV re
renv->ss.checkpoint_num_fassociate++; renv->ss.checkpoint_num_fassociate++;
assert(r==DB_NOTFOUND); //Not open assert(r==DB_NOTFOUND); //Not open
// open it if it exists // open it if it exists
r = internal_recover_fopen_or_fcreate(renv, FALSE, 0, &l->iname, l->filenum, l->treeflags, 0, NULL, NULL, 0); // if rollback file, specify which checkpointed version of file we need (not just the latest)
if (r==0 && !strcmp(fname, ROLLBACK_CACHEFILE_NAME)) { {
BOOL rollback_file = !strcmp(fname, ROLLBACK_CACHEFILE_NAME);
LSN required_lsn = ZERO_LSN;
if (rollback_file)
required_lsn = renv->ss.checkpoint_begin_lsn;
r = internal_recover_fopen_or_fcreate(renv, FALSE, 0, &l->iname, l->filenum, l->treeflags, 0, NULL, NULL, 0, required_lsn);
if (r==0 && rollback_file) {
//Load rollback cachefile //Load rollback cachefile
r = file_map_find(&renv->fmap, l->filenum, &tuple); r = file_map_find(&renv->fmap, l->filenum, &tuple);
assert(r==0); assert(r==0);
renv->logger->rollback_cachefile = tuple->brt->cf; renv->logger->rollback_cachefile = tuple->brt->cf;
} }
}
break; break;
case FORWARD_NEWER_CHECKPOINT_END: case FORWARD_NEWER_CHECKPOINT_END:
if (r == 0) { //IF it is open if (r == 0) { //IF it is open
...@@ -637,7 +647,7 @@ static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) { ...@@ -637,7 +647,7 @@ static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
toku_free(iname); toku_free(iname);
BOOL must_create = TRUE; BOOL must_create = TRUE;
r = internal_recover_fopen_or_fcreate(renv, must_create, l->mode, &l->iname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, txn, l->nodesize); r = internal_recover_fopen_or_fcreate(renv, must_create, l->mode, &l->iname, l->filenum, l->treeflags, l->descriptor_version, &l->descriptor, txn, l->nodesize, ZERO_LSN);
return r; return r;
} }
...@@ -664,7 +674,7 @@ static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) { ...@@ -664,7 +674,7 @@ static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
if (strcmp(fname, ROLLBACK_CACHEFILE_NAME)) { if (strcmp(fname, ROLLBACK_CACHEFILE_NAME)) {
//Rollback cachefile can only be opened via fassociate. //Rollback cachefile can only be opened via fassociate.
r = internal_recover_fopen_or_fcreate(renv, must_create, 0, &l->iname, l->filenum, l->treeflags, descriptor_version, descriptor, txn, 0); r = internal_recover_fopen_or_fcreate(renv, must_create, 0, &l->iname, l->filenum, l->treeflags, descriptor_version, descriptor, txn, 0, ZERO_LSN);
} }
toku_free(fname); toku_free(fname);
return r; return r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment