recover.cc 53.6 KB
Newer Older
1 2
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
Bradley C. Kuszmaul's avatar
Bradley C. Kuszmaul committed
3
#ident "$Id$"
4
#ident "Copyright (c) 2007-2012 Tokutek Inc.  All rights reserved."
5
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
6

7
#include <ft/log_header.h>
8 9 10 11 12

#include "ft.h"
#include "log-internal.h"
#include "logcursor.h"
#include "cachetable.h"
13
#include "checkpoint.h"
14
#include "txn_manager.h"
15

16 17
static const char recovery_lock_file[] = "/__tokudb_recoverylock_dont_delete_me";

18
int tokudb_recovery_trace = 0;                    // turn on recovery tracing, default off.
19

20
//#define DO_VERIFY_COUNTS
21
#ifdef DO_VERIFY_COUNTS
Yoni Fogel's avatar
Yoni Fogel committed
22
#define VERIFY_COUNTS(n) toku_verify_or_set_counts(n, false)
23 24 25 26
#else
#define VERIFY_COUNTS(n) ((void)0)
#endif

27
// time in seconds between recovery progress reports
28 29
#define TOKUDB_RECOVERY_PROGRESS_TIME 15

30 31 32 33 34 35 36
enum ss {
    BACKWARD_NEWER_CHECKPOINT_END = 1,
    BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END,
    FORWARD_BETWEEN_CHECKPOINT_BEGIN_END,
    FORWARD_NEWER_CHECKPOINT_END,
};

37
struct scan_state {
38
    enum ss ss;
39 40 41 42 43 44
    LSN checkpoint_begin_lsn;
    LSN checkpoint_end_lsn;
    uint64_t checkpoint_end_timestamp;
    uint64_t checkpoint_begin_timestamp;
    uint32_t checkpoint_num_fassociate;
    uint32_t checkpoint_num_xstillopen;
45
    TXNID last_xid;
46 47
};

48 49 50 51
static const char *scan_state_strings[] = {
    "?", "bw_newer", "bw_between", "fw_between", "fw_newer",
};

52
static void scan_state_init(struct scan_state *ss) {
53 54 55 56 57
    ss->ss = BACKWARD_NEWER_CHECKPOINT_END;
    ss->checkpoint_begin_lsn = ZERO_LSN;
    ss->checkpoint_end_lsn = ZERO_LSN;
    ss->checkpoint_num_fassociate = 0;
    ss->checkpoint_num_xstillopen = 0;
58
    ss->last_xid = 0;
59
}
60

61 62 63
static const char *scan_state_string(struct scan_state *ss) {
    assert(BACKWARD_NEWER_CHECKPOINT_END <= ss->ss && ss->ss <= FORWARD_NEWER_CHECKPOINT_END);
    return scan_state_strings[ss->ss];
64 65
}

66 67 68
// File map tuple
struct file_map_tuple {
    FILENUM filenum;
69
    FT_HANDLE ft_handle;     // NULL ft_handle means it's a rollback file.
70
    char *iname;
71
    struct __toku_db fake_db;
72 73
};

74
static void file_map_tuple_init(struct file_map_tuple *tuple, FILENUM filenum, FT_HANDLE brt, char *iname) {
75
    tuple->filenum = filenum;
76
    tuple->ft_handle = brt;
77
    tuple->iname = iname;
78
    // use a fake DB for comparisons, using the ft's cmp descriptor
79
    memset(&tuple->fake_db, 0, sizeof(tuple->fake_db));
80 81
    tuple->fake_db.cmp_descriptor = &tuple->ft_handle->ft->cmp_descriptor;
    tuple->fake_db.descriptor = &tuple->ft_handle->ft->descriptor;
82 83 84
}

static void file_map_tuple_destroy(struct file_map_tuple *tuple) {
85 86 87
    if (tuple->iname) {
        toku_free(tuple->iname);
        tuple->iname = NULL;
88 89 90
    }
}

91
// Map filenum to brt
Rich Prohaska's avatar
Rich Prohaska committed
92
struct file_map {
93
    OMT filenums;
94 95
};

96 97 98 99 100 101 102
// The recovery environment
struct recover_env {
    DB_ENV *env;
    prepared_txn_callback_t    prepared_txn_callback;    // at the end of recovery, all the prepared txns are passed back to the ydb layer to make them into valid transactions.
    keep_cachetable_callback_t keep_cachetable_callback; // after recovery, store the cachetable into the environment.
    CACHETABLE ct;
    TOKULOGGER logger;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
103
    CHECKPOINTER cp;
104 105
    ft_compare_func bt_compare;
    ft_update_func update_function;
106 107 108 109
    generate_row_for_put_func generate_row_for_put;
    generate_row_for_del_func generate_row_for_del;
    struct scan_state ss;
    struct file_map fmap;
Yoni Fogel's avatar
Yoni Fogel committed
110
    bool goforward;
111 112 113 114 115
    bool destroy_logger_at_end; // If true then destroy the logger when we are done.  If false then set the logger into write-files mode when we are done with recovery.*/
};
typedef struct recover_env *RECOVER_ENV;


Rich Prohaska's avatar
Rich Prohaska committed
116
static void file_map_init(struct file_map *fmap) {
117 118 119 120 121 122
    int r = toku_omt_create(&fmap->filenums);
    assert(r == 0);
}

static void file_map_destroy(struct file_map *fmap) {
    toku_omt_destroy(&fmap->filenums);
123 124
}

125 126 127 128
static uint32_t file_map_get_num_dictionaries(struct file_map *fmap) {
    return toku_omt_size(fmap->filenums);
}

129
static void file_map_close_dictionaries(struct file_map *fmap, LSN oplsn) {
130 131
    int r;

132
    while (1) {
Yoni Fogel's avatar
Yoni Fogel committed
133
        uint32_t n = toku_omt_size(fmap->filenums);
134 135 136
        if (n == 0)
            break;
        OMTVALUE v;
137
        r = toku_omt_fetch(fmap->filenums, n-1, &v);
138 139 140
        assert(r == 0);
        r = toku_omt_delete_at(fmap->filenums, n-1);
        assert(r == 0);
141
        struct file_map_tuple *CAST_FROM_VOIDP(tuple, v);
142 143 144
        assert(tuple->ft_handle);
        // Logging is on again, but we must pass the right LSN into close.
        if (tuple->ft_handle) { // it's a DB, not a rollback file
145
            toku_ft_handle_close_recovery(tuple->ft_handle, oplsn);
146
        }
147 148
        file_map_tuple_destroy(tuple);
        toku_free(tuple);
149 150 151
    }
}

152
static int file_map_h(OMTVALUE omtv, void *v) {
153 154
    struct file_map_tuple *CAST_FROM_VOIDP(a, omtv);
    FILENUM *CAST_FROM_VOIDP(b, v);
155 156
    if (a->filenum.fileid < b->fileid) return -1;
    if (a->filenum.fileid > b->fileid) return +1;
157 158 159
    return 0;
}

160
static int file_map_insert (struct file_map *fmap, FILENUM fnum, FT_HANDLE brt, char *iname) {
161
    struct file_map_tuple *XMALLOC(tuple);
162
    file_map_tuple_init(tuple, fnum, brt, iname);
163 164 165 166 167
    int r = toku_omt_insert(fmap->filenums, tuple, file_map_h, &fnum, NULL);
    return r;
}

static void file_map_remove(struct file_map *fmap, FILENUM fnum) {
Yoni Fogel's avatar
Yoni Fogel committed
168
    OMTVALUE v; uint32_t idx;
169
    int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx);
170
    if (r == 0) {
171
        struct file_map_tuple *CAST_FROM_VOIDP(tuple, v);
172 173 174 175 176 177
        r = toku_omt_delete_at(fmap->filenums, idx);
        file_map_tuple_destroy(tuple);
        toku_free(tuple);
    }
}

178
// Look up file info: given FILENUM, return file_map_tuple (or DB_NOTFOUND)
179
static int file_map_find(struct file_map *fmap, FILENUM fnum, struct file_map_tuple **file_map_tuple) {
Yoni Fogel's avatar
Yoni Fogel committed
180
    OMTVALUE v; uint32_t idx;
181
    int r = toku_omt_find_zero(fmap->filenums, file_map_h, &fnum, &v, &idx);
182
    if (r == 0) {
183
        struct file_map_tuple *CAST_FROM_VOIDP(tuple, v);
184 185
        assert(tuple->filenum.fileid == fnum.fileid);
        *file_map_tuple = tuple;
186
    }
187
    else assert(r==DB_NOTFOUND);
188
    return r;
189 190
}

191
static int recover_env_init (RECOVER_ENV renv,
192 193 194 195 196 197
                             const char *env_dir,
                             DB_ENV *env,
                             prepared_txn_callback_t    prepared_txn_callback,
                             keep_cachetable_callback_t keep_cachetable_callback,
                             TOKULOGGER logger,
                             ft_compare_func bt_compare,
198
                             ft_update_func update_function,
199 200
                             generate_row_for_put_func generate_row_for_put,
                             generate_row_for_del_func generate_row_for_del,
201
                             size_t cachetable_size) {
202
    int r = 0;
203

204 205 206
    // If we are passed a logger use it, otherwise create one.
    renv->destroy_logger_at_end = logger==NULL;
    if (logger) {
207
        renv->logger = logger;
208
    } else {
209 210
        r = toku_logger_create(&renv->logger);
        assert(r == 0);
211
    }
Yoni Fogel's avatar
Yoni Fogel committed
212
    toku_logger_write_log_files(renv->logger, false);
213
    toku_cachetable_create(&renv->ct, cachetable_size ? cachetable_size : 1<<25, (LSN){0}, renv->logger);
214 215
    toku_cachetable_set_env_dir(renv->ct, env_dir);
    if (keep_cachetable_callback) keep_cachetable_callback(env, renv->ct);
216
    toku_logger_set_cachetable(renv->logger, renv->ct);
217 218 219 220 221 222 223
    renv->env                      = env;
    renv->prepared_txn_callback    = prepared_txn_callback;
    renv->keep_cachetable_callback = keep_cachetable_callback;
    renv->bt_compare               = bt_compare;
    renv->update_function          = update_function;
    renv->generate_row_for_put     = generate_row_for_put;
    renv->generate_row_for_del     = generate_row_for_del;
224
    file_map_init(&renv->fmap);
Yoni Fogel's avatar
Yoni Fogel committed
225
    renv->goforward = false;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
226
    renv->cp = toku_cachetable_get_checkpointer(renv->ct);
227 228
    if (tokudb_recovery_trace)
        fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__);
229 230 231
    return r;
}

232
static void recover_env_cleanup (RECOVER_ENV renv) {
233 234
    int r;

235
    assert(toku_omt_size(renv->fmap.filenums)==0);
236
    file_map_destroy(&renv->fmap);
237

238
    if (renv->destroy_logger_at_end) {
239
        toku_logger_close_rollback(renv->logger);
240 241
        r = toku_logger_close(&renv->logger);
        assert(r == 0);
242
    } else {
243
        toku_logger_write_log_files(renv->logger, true);
244
    }
245

246
    if (renv->keep_cachetable_callback) {
247
        renv->ct = NULL;
248
    } else {
249
        toku_cachetable_close(&renv->ct);
250
    }
251

252 253
    if (tokudb_recovery_trace)
        fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__);
254 255
}

256
static const char *recover_state(RECOVER_ENV renv) {
257
    return scan_state_string(&renv->ss);
258 259
}

260
// Open the file if it is not already open.  If it is already open, then do nothing.
Yoni Fogel's avatar
Yoni Fogel committed
261
static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, bool must_create, int UU(mode), BYTESTRING *bs_iname, FILENUM filenum, uint32_t treeflags,
262
                                              TOKUTXN txn, uint32_t nodesize, uint32_t basementnodesize, enum toku_compression_method compression_method, LSN max_acceptable_lsn) {
263
    int r = 0;
264
    FT_HANDLE brt = NULL;
265
    char *iname = fixup_fname(bs_iname);
Rich Prohaska's avatar
Rich Prohaska committed
266

267
    toku_ft_handle_create(&brt);
268
    toku_ft_set_flags(brt, treeflags);
269 270

    if (nodesize != 0) {
271
        toku_ft_handle_set_nodesize(brt, nodesize);
272
    }
273

274
    if (basementnodesize != 0) {
275
        toku_ft_handle_set_basementnodesize(brt, basementnodesize);
276 277
    }

278
    if (compression_method != TOKU_DEFAULT_COMPRESSION_METHOD) {
279
        toku_ft_handle_set_compression_method(brt, compression_method);
280 281
    }

282
    // set the key compare functions
283
    if (!(treeflags & TOKU_DB_KEYCMP_BUILTIN) && renv->bt_compare) {
284
        toku_ft_set_bt_compare(brt, renv->bt_compare);
285
    }
286

287
    if (renv->update_function) {
288
        toku_ft_set_update(brt, renv->update_function);
289 290
    }

Yoni Fogel's avatar
Yoni Fogel committed
291
    // TODO mode (FUTURE FEATURE)
292
    //mode = mode;
Rich Prohaska's avatar
Rich Prohaska committed
293

294
    r = toku_ft_handle_open_recovery(brt, iname, must_create, must_create, renv->ct, txn, filenum, max_acceptable_lsn);
295
    if (r != 0) {
296
        //Note:  If ft_handle_open fails, then close_ft will NOT write a header to disk.
297 298
        //No need to provide lsn, so use the regular toku_ft_handle_close function
        toku_ft_handle_close(brt);
299 300 301 302
        toku_free(iname);
        if (r == ENOENT) //Not an error to simply be missing.
            r = 0;
        return r;
303
    }
Rich Prohaska's avatar
Rich Prohaska committed
304

305
    file_map_insert(&renv->fmap, filenum, brt, iname);
306
    return 0;
307 308
}

309 310
static int toku_recover_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
    int r;
311
    TXN_MANAGER mgr = toku_logger_get_txn_manager(renv->logger);
312 313 314
    switch (renv->ss.ss) {
    case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
        assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
315 316 317 318
        invariant(renv->ss.last_xid == TXNID_NONE);
        renv->ss.last_xid = l->last_xid;
        toku_txn_manager_set_last_xid_from_recovered_checkpoint(mgr, l->last_xid);

319
        r = 0;
320 321 322
        break;
    case FORWARD_NEWER_CHECKPOINT_END:
        assert(l->lsn.lsn > renv->ss.checkpoint_end_lsn.lsn);
323 324 325 326 327
        // Verify last_xid is no older than the previous begin
        invariant(l->last_xid >= renv->ss.last_xid);
        // Verify last_xid is no older than the newest txn
        invariant(l->last_xid >= toku_txn_manager_get_last_xid(mgr));

328 329 330 331 332 333 334 335 336 337 338 339 340
        r = 0; // ignore it (log only has a begin checkpoint)
        break;
    default:
        fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
        abort();
        break;
    }
    return r;
}

static int toku_recover_backward_begin_checkpoint (struct logtype_begin_checkpoint *l, RECOVER_ENV renv) {
    int r;
    time_t tnow = time(NULL);
341
    fprintf(stderr, "%.24s Tokudb recovery bw_begin_checkpoint at %" PRIu64 " timestamp %" PRIu64 " (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, recover_state(renv));
342 343
    switch (renv->ss.ss) {
    case BACKWARD_NEWER_CHECKPOINT_END:
344
        // incomplete checkpoint, nothing to do
345
        r = 0;
346 347 348 349 350
        break;
    case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
        assert(l->lsn.lsn == renv->ss.checkpoint_begin_lsn.lsn);
        renv->ss.ss = FORWARD_BETWEEN_CHECKPOINT_BEGIN_END;
        renv->ss.checkpoint_begin_timestamp = l->timestamp;
Yoni Fogel's avatar
Yoni Fogel committed
351
        renv->goforward = true;
352
        tnow = time(NULL);
353
        fprintf(stderr, "%.24s Tokudb recovery turning around at begin checkpoint %" PRIu64 " time %" PRIu64 "\n", 
354
                ctime(&tnow), l->lsn.lsn, 
355
                renv->ss.checkpoint_end_timestamp - renv->ss.checkpoint_begin_timestamp);
356 357 358 359 360 361 362 363 364 365 366 367 368 369
        r = 0;
        break;
    default:
        fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
        abort();
        break;
    }
    return r;
}

static int toku_recover_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
    int r;
    switch (renv->ss.ss) {
    case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
370
        assert(l->lsn_begin_checkpoint.lsn == renv->ss.checkpoint_begin_lsn.lsn);
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
        assert(l->lsn.lsn == renv->ss.checkpoint_end_lsn.lsn);
        assert(l->num_fassociate_entries == renv->ss.checkpoint_num_fassociate);
        assert(l->num_xstillopen_entries == renv->ss.checkpoint_num_xstillopen);
        renv->ss.ss = FORWARD_NEWER_CHECKPOINT_END;
        r = 0;
        break;
    case FORWARD_NEWER_CHECKPOINT_END:
        assert(0);
        return 0;
    default:
        assert(0);
        return 0;
    }
    return r;
}

static int toku_recover_backward_end_checkpoint (struct logtype_end_checkpoint *l, RECOVER_ENV renv) {
    time_t tnow = time(NULL);
389
    fprintf(stderr, "%.24s Tokudb recovery bw_end_checkpoint at %" PRIu64 " timestamp %" PRIu64 " xid %" PRIu64 " (%s)\n", ctime(&tnow), l->lsn.lsn, l->timestamp, l->lsn_begin_checkpoint.lsn, recover_state(renv));
390 391
    switch (renv->ss.ss) {
    case BACKWARD_NEWER_CHECKPOINT_END:
392
        renv->ss.ss = BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END;
393
        renv->ss.checkpoint_begin_lsn.lsn = l->lsn_begin_checkpoint.lsn;
394
        renv->ss.checkpoint_end_lsn.lsn   = l->lsn.lsn;
395
        renv->ss.checkpoint_end_timestamp = l->timestamp;
396
        return 0;
397
    case BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END:
398 399
        fprintf(stderr, "Tokudb recovery %s:%d Should not see two end_checkpoint log entries without an intervening begin_checkpoint\n", __FILE__, __LINE__);
        abort();
400 401 402 403 404 405 406 407
    default:
        break;
    }
    fprintf(stderr, "Tokudb recovery %s: %d Unknown checkpoint state %d\n", __FILE__, __LINE__, (int)renv->ss.ss);
    abort();
}

static int toku_recover_fassociate (struct logtype_fassociate *l, RECOVER_ENV renv) {
408
    struct file_map_tuple *tuple = NULL;
409 410 411 412
    int r = file_map_find(&renv->fmap, l->filenum, &tuple);
    char *fname = fixup_fname(&l->iname);
    switch (renv->ss.ss) {
    case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END:
413
        renv->ss.checkpoint_num_fassociate++;
414
        assert(r==DB_NOTFOUND); //Not open
415
        // Open it if it exists.
416 417
        // If rollback file, specify which checkpointed version of file we need (not just the latest)
        // because we cannot use a rollback log that is later than the last complete checkpoint.  See #3113.
418
        {
Yoni Fogel's avatar
Yoni Fogel committed
419
            bool rollback_file = (0==strcmp(fname, ROLLBACK_CACHEFILE_NAME));
420
            LSN max_acceptable_lsn = MAX_LSN;
421
            if (rollback_file) {
422
                max_acceptable_lsn = renv->ss.checkpoint_begin_lsn;
423
                FT_HANDLE t;
424
                toku_ft_handle_create(&t);
425 426
                r = toku_ft_handle_open_recovery(t, ROLLBACK_CACHEFILE_NAME, false, false, renv->ct, (TOKUTXN)NULL, l->filenum, max_acceptable_lsn);
                renv->logger->rollback_cachefile = t->ft->cf;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
427
                toku_logger_initialize_rollback_cache(renv->logger, t->ft);
428
            } else {
Yoni Fogel's avatar
Yoni Fogel committed
429
                r = internal_recover_fopen_or_fcreate(renv, false, 0, &l->iname, l->filenum, l->treeflags, NULL, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, max_acceptable_lsn);
430 431
                assert(r==0);
            }
432
        }
433 434
        // try to open the file again and if we get it, restore
        // the unlink on close bit.
435 436
        int ret;
        ret = file_map_find(&renv->fmap, l->filenum, &tuple);
437 438 439
        if (ret == 0 && l->unlink_on_close) {
            toku_cachefile_unlink_on_close(tuple->ft_handle->ft->cf);
        }
440 441 442 443 444 445 446 447 448 449 450 451 452
        break;
    case FORWARD_NEWER_CHECKPOINT_END:
        if (r == 0) { //IF it is open
            // assert that the filenum maps to the correct iname
            assert(strcmp(fname, tuple->iname) == 0);
        }
        r = 0;
        break;
    default:
        assert(0);
        return 0;
    }
    toku_free(fname);
453

454 455
    return r;
}
456

457 458 459 460
static int toku_recover_backward_fassociate (struct logtype_fassociate *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}
461

462
static int
463
recover_transaction(TOKUTXN *txnp, TXNID_PAIR xid, TXNID_PAIR parentxid, TOKULOGGER logger) {
464 465 466 467
    int r;

    // lookup the parent
    TOKUTXN parent = NULL;
468
    if (!txn_pair_is_none(parentxid)) {
469
        toku_txnid2txn(logger, parentxid, &parent);
470 471
        assert(parent!=NULL);
    }
472 473 474
    else {
        invariant(xid.child_id64 == TXNID_NONE);
    }
475 476 477 478 479

    // create a transaction and bind it to the transaction id
    TOKUTXN txn = NULL;
    {
        //Verify it does not yet exist.
480
        toku_txnid2txn(logger, xid, &txn);
481 482
        assert(txn==NULL);
    }
483
    r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL, true);
484
    assert(r == 0);
485 486 487
    // We only know about it because it was logged.  Restore the log bit.
    // Logging is 'off' but it will still set the bit.
    toku_maybe_log_begin_txn_for_write_operation(txn);
488 489 490 491
    if (txnp) *txnp = txn;
    return 0;
}

492
static int recover_xstillopen_internal (TOKUTXN         *txnp,
493
                                        LSN           UU(lsn),
494 495
                                        TXNID_PAIR       xid,
                                        TXNID_PAIR       parentxid,
Yoni Fogel's avatar
Yoni Fogel committed
496
                                        uint64_t        rollentry_raw_count,
497
                                        FILENUMS         open_filenums,
498
                                        bool             force_fsync_on_commit,
Yoni Fogel's avatar
Yoni Fogel committed
499 500
                                        uint64_t        num_rollback_nodes,
                                        uint64_t        num_rollentries,
501 502 503
                                        BLOCKNUM         spilled_rollback_head,
                                        BLOCKNUM         spilled_rollback_tail,
                                        BLOCKNUM         current_rollback,
Yoni Fogel's avatar
Yoni Fogel committed
504 505
                                        uint32_t     UU(crc),
                                        uint32_t     UU(len),
506
                                        RECOVER_ENV      renv) {
507
    int r;
508
    *txnp = NULL;
509 510
    switch (renv->ss.ss) {
    case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: {
511
        renv->ss.checkpoint_num_xstillopen++;
512
        invariant(renv->ss.last_xid != TXNID_NONE);
513
        invariant(xid.parent_id64 <= renv->ss.last_xid);
514 515
        TOKUTXN txn = NULL;
        { //Create the transaction.
516
            r = recover_transaction(&txn, xid, parentxid, renv->logger);
517 518
            assert(r==0);
            assert(txn!=NULL);
519
            *txnp = txn;
520 521
        }
        { //Recover rest of transaction.
522
#define COPY_TO_INFO(field) .field = field
523 524
            struct txninfo info = {
                COPY_TO_INFO(rollentry_raw_count),
525 526
                .num_fts  = 0,    //Set afterwards
                .open_fts = NULL, //Set afterwards
527 528 529 530 531 532 533 534
                COPY_TO_INFO(force_fsync_on_commit),
                COPY_TO_INFO(num_rollback_nodes),
                COPY_TO_INFO(num_rollentries),
                COPY_TO_INFO(spilled_rollback_head),
                COPY_TO_INFO(spilled_rollback_tail),
                COPY_TO_INFO(current_rollback)
            };
#undef COPY_TO_INFO
535 536 537
            //Generate open_fts
            FT array[open_filenums.num]; //Allocate maximum possible requirement
            info.open_fts = array;
538
            uint32_t i;
539
            for (i = 0; i < open_filenums.num; i++) {
540 541
                //open_filenums.filenums[]
                struct file_map_tuple *tuple = NULL;
542
                r = file_map_find(&renv->fmap, open_filenums.filenums[i], &tuple);
543
                if (r==0) {
544
                    info.open_fts[info.num_fts++] = tuple->ft_handle->ft;
545 546 547 548 549 550 551 552 553
                }
                else {
                    assert(r==DB_NOTFOUND);
                }
            }
            r = toku_txn_load_txninfo(txn, &info);
            assert(r==0);
        }
        break;
Rich Prohaska's avatar
Rich Prohaska committed
554
    }
555 556 557
    case FORWARD_NEWER_CHECKPOINT_END: {
        // assert that the transaction exists
        TOKUTXN txn = NULL;
558
        toku_txnid2txn(renv->logger, xid, &txn);
559
        r = 0;
560
        *txnp = txn;
561 562 563 564 565 566 567 568 569
        break;
    }
    default:
        assert(0);
        return 0;
    }
    return r;
}

570 571 572
static int toku_recover_xstillopen (struct logtype_xstillopen *l, RECOVER_ENV renv) {
    TOKUTXN txn;
    return recover_xstillopen_internal (&txn,
573 574 575 576 577 578 579 580 581 582 583 584 585 586
                                        l->lsn,
                                        l->xid,
                                        l->parentxid,
                                        l->rollentry_raw_count,
                                        l->open_filenums,
                                        l->force_fsync_on_commit,
                                        l->num_rollback_nodes,
                                        l->num_rollentries,
                                        l->spilled_rollback_head,
                                        l->spilled_rollback_tail,
                                        l->current_rollback,
                                        l->crc,
                                        l->len,
                                        renv);
587 588 589 590 591
}

static int toku_recover_xstillopenprepared (struct logtype_xstillopenprepared *l, RECOVER_ENV renv) {
    TOKUTXN txn;
    int r = recover_xstillopen_internal (&txn,
592 593
                                         l->lsn,
                                         l->xid,
594
                                         TXNID_PAIR_NONE,
595 596 597 598 599 600 601 602 603 604 605
                                         l->rollentry_raw_count,
                                         l->open_filenums,
                                         l->force_fsync_on_commit,
                                         l->num_rollback_nodes,
                                         l->num_rollentries,
                                         l->spilled_rollback_head,
                                         l->spilled_rollback_tail,
                                         l->current_rollback,
                                         l->crc,
                                         l->len,
                                         renv);
606 607 608 609 610
    if (r != 0) {
        goto exit;
    }
    switch (renv->ss.ss) {
        case FORWARD_BETWEEN_CHECKPOINT_BEGIN_END: {
611
            toku_txn_prepare_txn(txn, l->xa_xid);
612 613 614 615 616 617 618 619 620 621 622 623
            break;
        }
        case FORWARD_NEWER_CHECKPOINT_END: {
            assert(txn->state == TOKUTXN_PREPARING);
            break;
        }
        default: {
            assert(0);
        }
    }
exit:
    return r;
624 625
}

626 627 628 629
static int toku_recover_backward_xstillopen (struct logtype_xstillopen *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}
630 631 632 633
static int toku_recover_backward_xstillopenprepared (struct logtype_xstillopenprepared *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}
634 635 636

static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
    int r;
637
    r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger);
638 639 640 641 642 643 644 645 646 647 648
    return r;
}

static int toku_recover_backward_xbegin (struct logtype_xbegin *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) {
    // find the transaction by transaction id
    TOKUTXN txn = NULL;
649
    toku_txnid2txn(renv->logger, l->xid, &txn);
650 651 652
    assert(txn!=NULL);

    // commit the transaction
653
    int r = toku_txn_commit_with_lsn(txn, true, l->lsn,
654
                                 NULL, NULL);
655 656 657 658 659 660 661 662 663 664 665 666 667
    assert(r == 0);

    // close the transaction
    toku_txn_close_txn(txn);

    return 0;
}

static int toku_recover_backward_xcommit (struct logtype_xcommit *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

668 669 670
static int toku_recover_xprepare (struct logtype_xprepare *l, RECOVER_ENV renv) {
    // find the transaction by transaction id
    TOKUTXN txn = NULL;
671
    toku_txnid2txn(renv->logger, l->xid, &txn);
672 673 674
    assert(txn!=NULL);

    // Save the transaction
675
    toku_txn_prepare_txn(txn, l->xa_xid);
676 677 678 679 680 681 682 683 684 685 686

    return 0;
}

static int toku_recover_backward_xprepare (struct logtype_xprepare *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}



687 688 689 690 691
static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
    int r;

    // find the transaction by transaction id
    TOKUTXN txn = NULL;
692
    toku_txnid2txn(renv->logger, l->xid, &txn);
693 694 695
    assert(txn!=NULL);

    // abort the transaction
696
    r = toku_txn_abort_with_lsn(txn, l->lsn, NULL, NULL);
697 698 699 700 701 702 703 704 705 706
    assert(r == 0);

    // close the transaction
    toku_txn_close_txn(txn);

    return 0;
}

static int toku_recover_backward_xabort (struct logtype_xabort *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
707 708 709
    return 0;
}

710
// fcreate is like fopen except that the file must be created.
711
static int toku_recover_fcreate (struct logtype_fcreate *l, RECOVER_ENV renv) {
712
    int r;
713

714
    TOKUTXN txn = NULL;
715
    toku_txnid2txn(renv->logger, l->xid, &txn);
716

717 718 719
    // assert that filenum is closed
    struct file_map_tuple *tuple = NULL;
    r = file_map_find(&renv->fmap, l->filenum, &tuple);
720
    assert(r==DB_NOTFOUND);
721

722
    assert(txn!=NULL);
723

724 725
    //unlink if it exists (recreate from scratch).
    char *iname = fixup_fname(&l->iname);
726 727
    char *iname_in_cwd = toku_cachetable_get_fname_in_cwd(renv->ct, iname);
    r = unlink(iname_in_cwd);
728 729 730 731 732 733 734
    if (r != 0) {
        int er = get_error_errno();
        if (er != ENOENT) {
            fprintf(stderr, "Tokudb recovery %s:%d unlink %s %d\n", __FUNCTION__, __LINE__, iname, er);
            toku_free(iname);
            return r;
        }
735
    }
736
    assert(0!=strcmp(iname, ROLLBACK_CACHEFILE_NAME)); //Creation of rollback cachefile never gets logged.
737
    toku_free(iname_in_cwd);
738
    toku_free(iname);
739

Yoni Fogel's avatar
Yoni Fogel committed
740
    bool must_create = true;
741
    r = internal_recover_fopen_or_fcreate(renv, must_create, l->mode, &l->iname, l->filenum, l->treeflags, txn, l->nodesize, l->basementnodesize, (enum toku_compression_method) l->compression_method, MAX_LSN);
742
    return r;
743 744
}

745 746
static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
747 748
    return 0;
}
749

750 751


752
static int toku_recover_fopen (struct logtype_fopen *l, RECOVER_ENV renv) {
753 754 755
    int r;

    // assert that filenum is closed
756
    struct file_map_tuple *tuple = NULL;
757 758 759
    r = file_map_find(&renv->fmap, l->filenum, &tuple);
    assert(r==DB_NOTFOUND);

Yoni Fogel's avatar
Yoni Fogel committed
760
    bool must_create = false;
761 762 763
    TOKUTXN txn = NULL;
    char *fname = fixup_fname(&l->iname);

764
    assert(0!=strcmp(fname, ROLLBACK_CACHEFILE_NAME)); //Rollback cachefile can be opened only via fassociate.
765
    r = internal_recover_fopen_or_fcreate(renv, must_create, 0, &l->iname, l->filenum, l->treeflags, txn, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, MAX_LSN);
766

767
    toku_free(fname);
768 769 770
    return r;
}

771 772
static int toku_recover_backward_fopen (struct logtype_fopen *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
773 774 775
    return 0;
}

776
static int toku_recover_change_fdescriptor (struct logtype_change_fdescriptor *l, RECOVER_ENV renv) {
777 778 779 780
    int r;
    struct file_map_tuple *tuple = NULL;
    r = file_map_find(&renv->fmap, l->filenum, &tuple);
    if (r==0) {
781
        TOKUTXN txn = NULL;
782
        //Maybe do the descriptor (lsn filter)
783
        toku_txnid2txn(renv->logger, l->xid, &txn);
784 785 786 787 788 789 790 791 792 793 794
        DBT old_descriptor, new_descriptor;
        toku_fill_dbt(
            &old_descriptor, 
            l->old_descriptor.data, 
            l->old_descriptor.len
            );
        toku_fill_dbt(
            &new_descriptor, 
            l->new_descriptor.data, 
            l->new_descriptor.len
            );
795
        toku_ft_change_descriptor(
796
            tuple->ft_handle, 
797 798
            &old_descriptor, 
            &new_descriptor, 
Yoni Fogel's avatar
Yoni Fogel committed
799
            false, 
800 801
            txn,
            l->update_cmp_descriptor
802
            );
803 804 805 806
    }    
    return 0;
}

807
static int toku_recover_backward_change_fdescriptor (struct logtype_change_fdescriptor *UU(l), RECOVER_ENV UU(renv)) {
808 809 810 811
    return 0;
}


812
// if file referred to in l is open, close it
813 814 815
static int toku_recover_fclose (struct logtype_fclose *l, RECOVER_ENV renv) {
    struct file_map_tuple *tuple = NULL;
    int r = file_map_find(&renv->fmap, l->filenum, &tuple);
816
    if (r == 0) {  // if file is open
817
        char *iname = fixup_fname(&l->iname);
818
        assert(strcmp(tuple->iname, iname) == 0);  // verify that file_map has same iname as log entry
819

820
        if (0!=strcmp(iname, ROLLBACK_CACHEFILE_NAME)) {
821
            //Rollback cachefile is closed manually at end of recovery, not here
822
            toku_ft_handle_close_recovery(tuple->ft_handle, l->lsn);
823
        }
824
        file_map_remove(&renv->fmap, l->filenum);
825
        toku_free(iname);
826 827 828 829 830 831 832 833 834
    }
    return 0;
}

static int toku_recover_backward_fclose (struct logtype_fclose *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

835
// fdelete is a transactional file delete.
836 837
static int toku_recover_fdelete (struct logtype_fdelete *l, RECOVER_ENV renv) {
    TOKUTXN txn = NULL;
838
    toku_txnid2txn(renv->logger, l->xid, &txn);
839
    assert(txn != NULL);
840 841 842 843 844 845

    // if the forward scan in recovery found this file and opened it, we
    // need to mark the txn to remove the ft on commit. if the file was
    // not found and not opened, we don't need to do anything - the ft
    // is already gone, so we're happy.
    struct file_map_tuple *tuple;
846
    int r = file_map_find(&renv->fmap, l->filenum, &tuple);
847
    if (r == 0) {
848
        toku_ft_unlink_on_commit(tuple->ft_handle, txn);
849 850
    }
    return 0;
851 852 853
}

static int toku_recover_backward_fdelete (struct logtype_fdelete *UU(l), RECOVER_ENV UU(renv)) {
854 855
    // nothing
    return 0;
856 857
}

858 859 860
static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV renv) {
    int r;
    TOKUTXN txn = NULL;
861
    toku_txnid2txn(renv->logger, l->xid, &txn);
862
    assert(txn!=NULL);
863
    struct file_map_tuple *tuple = NULL;
864
    r = file_map_find(&renv->fmap, l->filenum, &tuple);
865 866 867 868 869
    if (r==0) {
        //Maybe do the insertion if we found the cachefile.
        DBT keydbt, valdbt;
        toku_fill_dbt(&keydbt, l->key.data, l->key.len);
        toku_fill_dbt(&valdbt, l->value.data, l->value.len);
870
        toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT);
871 872
        toku_txn_maybe_note_ft(txn, tuple->ft_handle->ft);
    }
873 874 875
    return 0;
}

876
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), RECOVER_ENV UU(renv)) {
877
    // nothing
878 879 880
    return 0;
}

881 882 883
static int toku_recover_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *l, RECOVER_ENV renv) {
    int r;
    TOKUTXN txn = NULL;
884
    toku_txnid2txn(renv->logger, l->xid, &txn);
885
    assert(txn!=NULL);
886 887
    struct file_map_tuple *tuple = NULL;
    r = file_map_find(&renv->fmap, l->filenum, &tuple);
888 889 890 891 892
    if (r==0) {
        //Maybe do the insertion if we found the cachefile.
        DBT keydbt, valdbt;
        toku_fill_dbt(&keydbt, l->key.data, l->key.len);
        toku_fill_dbt(&valdbt, l->value.data, l->value.len);
893
        toku_ft_maybe_insert(tuple->ft_handle, &keydbt, &valdbt, txn, true, l->lsn, false, FT_INSERT_NO_OVERWRITE);
894 895 896 897 898 899 900 901 902
    }    
    return 0;
}

static int toku_recover_backward_enq_insert_no_overwrite (struct logtype_enq_insert_no_overwrite *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

903
static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVER_ENV renv) {
904
    int r;
905
    TOKUTXN txn = NULL;
906
    toku_txnid2txn(renv->logger, l->xid, &txn);
907
    assert(txn!=NULL);
908 909
    struct file_map_tuple *tuple = NULL;
    r = file_map_find(&renv->fmap, l->filenum, &tuple);
910 911 912 913
    if (r==0) {
        //Maybe do the deletion if we found the cachefile.
        DBT keydbt;
        toku_fill_dbt(&keydbt, l->key.data, l->key.len);
914
        toku_ft_maybe_delete(tuple->ft_handle, &keydbt, txn, true, l->lsn, false);
915
    }    
916
    return 0;
917 918
}

919
static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *UU(l), RECOVER_ENV UU(renv)) {
920
    // nothing
921 922
    return 0;
}
923

924 925 926
static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple *l, RECOVER_ENV renv) {
    int r;
    TOKUTXN txn = NULL;
927
    toku_txnid2txn(renv->logger, l->xid, &txn);
928 929
    assert(txn!=NULL);
    DB *src_db = NULL;
Yoni Fogel's avatar
Yoni Fogel committed
930
    bool do_inserts = true;
931
    {
932
        struct file_map_tuple *tuple = NULL;
933 934 935 936
        r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
        if (l->src_filenum.fileid == FILENUM_NONE.fileid)
            assert(r==DB_NOTFOUND);
        else {
937
            if (r == 0)
938
                src_db = &tuple->fake_db;
939
            else
Yoni Fogel's avatar
Yoni Fogel committed
940
                do_inserts = false; // src file was probably deleted, #3129
941 942
        }
    }
943 944 945 946 947 948 949 950 951 952 953 954 955
    
    if (do_inserts) {
        DBT src_key, src_val, dest_key, dest_val;
        toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
        toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
        toku_init_dbt_flags(&dest_key, DB_DBT_REALLOC);
        toku_init_dbt_flags(&dest_val, DB_DBT_REALLOC);

        for (uint32_t file = 0; file < l->dest_filenums.num; file++) {
            struct file_map_tuple *tuple = NULL;
            r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
            if (r==0) {
                // We found the cachefile.  (maybe) Do the insert.
956
                DB *db = &tuple->fake_db;
957 958
                r = renv->generate_row_for_put(db, src_db, &dest_key, &dest_val, &src_key, &src_val);
                assert(r==0);
959
                toku_ft_maybe_insert(tuple->ft_handle, &dest_key, &dest_val, txn, true, l->lsn, false, FT_INSERT);
960

961 962 963 964 965 966 967 968 969
                //flags==0 means generate_row_for_put callback changed it
                //(and freed any memory necessary to do so) so that values are now stored
                //in temporary memory that does not need to be freed.  We need to continue
                //using DB_DBT_REALLOC however.
                if (dest_key.flags == 0) 
                    toku_init_dbt_flags(&dest_key, DB_DBT_REALLOC);
                if (dest_val.flags == 0)
                    toku_init_dbt_flags(&dest_val, DB_DBT_REALLOC);
            }
970
        }
971 972 973

        if (dest_key.data) toku_free(dest_key.data); //TODO: #2321 May need windows hack
        if (dest_val.data) toku_free(dest_val.data); //TODO: #2321 May need windows hack
974
    }
975 976 977 978

    return 0;
}

979
static int toku_recover_backward_enq_insert_multiple (struct logtype_enq_insert_multiple *UU(l), RECOVER_ENV UU(renv)) {
980 981 982 983
    // nothing
    return 0;
}

984
static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple *l, RECOVER_ENV renv) {
985 986
    int r;
    TOKUTXN txn = NULL;
987
    toku_txnid2txn(renv->logger, l->xid, &txn);
988 989
    assert(txn!=NULL);
    DB *src_db = NULL;
Yoni Fogel's avatar
Yoni Fogel committed
990
    bool do_deletes = true;
991 992 993 994 995 996
    {
        struct file_map_tuple *tuple = NULL;
        r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
        if (l->src_filenum.fileid == FILENUM_NONE.fileid)
            assert(r==DB_NOTFOUND);
        else {
997
            if (r == 0)
998
                src_db = &tuple->fake_db;
999
            else
Yoni Fogel's avatar
Yoni Fogel committed
1000
                do_deletes = false; // src file was probably deleted, #3129
1001
        }
1002
    }
1003

1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014
    if (do_deletes) {
        DBT src_key, src_val, dest_key;
        toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
        toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
        toku_init_dbt_flags(&dest_key, DB_DBT_REALLOC);

        for (uint32_t file = 0; file < l->dest_filenums.num; file++) {
            struct file_map_tuple *tuple = NULL;
            r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
            if (r==0) {
                // We found the cachefile.  (maybe) Do the delete.
1015
                DB *db = &tuple->fake_db;
1016 1017
                r = renv->generate_row_for_del(db, src_db, &dest_key, &src_key, &src_val);
                assert(r==0);
1018
                toku_ft_maybe_delete(tuple->ft_handle, &dest_key, txn, true, l->lsn, false);
1019

1020 1021 1022 1023 1024
                //flags==0 indicates the return values are stored in temporary memory that does
                //not need to be freed.  We need to continue using DB_DBT_REALLOC however.
                if (dest_key.flags == 0)
                    toku_init_dbt_flags(&dest_key, DB_DBT_REALLOC);
            }
1025
        }
1026 1027
        
        if (dest_key.flags & DB_DBT_REALLOC && dest_key.data) toku_free(dest_key.data); //TODO: #2321 May need windows hack
1028
    }
1029 1030 1031 1032

    return 0;
}

1033
static int toku_recover_backward_enq_delete_multiple (struct logtype_enq_delete_multiple *UU(l), RECOVER_ENV UU(renv)) {
1034 1035 1036 1037
    // nothing
    return 0;
}

1038 1039 1040
static int toku_recover_enq_update(struct logtype_enq_update *l, RECOVER_ENV renv) {
    int r;
    TOKUTXN txn = NULL;
1041
    toku_txnid2txn(renv->logger, l->xid, &txn);
1042 1043 1044 1045 1046 1047 1048 1049
    assert(txn != NULL);
    struct file_map_tuple *tuple = NULL;
    r = file_map_find(&renv->fmap, l->filenum, &tuple);
    if (r == 0) {
        // Maybe do the update if we found the cachefile.
        DBT key, extra;
        toku_fill_dbt(&key, l->key.data, l->key.len);
        toku_fill_dbt(&extra, l->extra.data, l->extra.len);
1050
        toku_ft_maybe_update(tuple->ft_handle, &key, &extra, txn, true, l->lsn, false);
1051 1052 1053 1054 1055 1056 1057
    }
    return 0;
}

static int toku_recover_enq_updatebroadcast(struct logtype_enq_updatebroadcast *l, RECOVER_ENV renv) {
    int r;
    TOKUTXN txn = NULL;
1058
    toku_txnid2txn(renv->logger, l->xid, &txn);
1059 1060 1061 1062 1063 1064 1065
    assert(txn != NULL);
    struct file_map_tuple *tuple = NULL;
    r = file_map_find(&renv->fmap, l->filenum, &tuple);
    if (r == 0) {
        // Maybe do the update broadcast if we found the cachefile.
        DBT extra;
        toku_fill_dbt(&extra, l->extra.data, l->extra.len);
1066
        toku_ft_maybe_update_broadcast(tuple->ft_handle, &extra, txn, true,
Yoni Fogel's avatar
Yoni Fogel committed
1067
                                            l->lsn, false, l->is_resetting_op);
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
    }
    return 0;
}

static int toku_recover_backward_enq_update(struct logtype_enq_update *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

static int toku_recover_backward_enq_updatebroadcast(struct logtype_enq_updatebroadcast *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

1082
static int toku_recover_comment (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) {
1083
    // nothing
1084 1085 1086
    return 0;
}

1087
static int toku_recover_backward_comment (struct logtype_comment *UU(l), RECOVER_ENV UU(renv)) {
1088
    // nothing
1089 1090 1091
    return 0;
}

1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
static int toku_recover_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

static int toku_recover_backward_shutdown_up_to_19 (struct logtype_shutdown_up_to_19 *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
static int toku_recover_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

1112
static int toku_recover_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
Yoni Fogel's avatar
Yoni Fogel committed
1113
    TOKUTXN txn = NULL;
1114
    toku_txnid2txn(renv->logger, l->xid, &txn);
1115
    assert(txn!=NULL);
Yoni Fogel's avatar
Yoni Fogel committed
1116 1117
    char *new_iname = fixup_fname(&l->new_iname);

1118
    toku_ft_load_recovery(txn, l->old_filenum, new_iname, 0, 0, (LSN*)NULL);
Yoni Fogel's avatar
Yoni Fogel committed
1119 1120

    toku_free(new_iname);
1121 1122 1123 1124 1125
    return 0;
}

static int toku_recover_backward_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
1126 1127 1128
    return 0;
}

1129 1130 1131
// #2954
static int toku_recover_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) {
    TOKUTXN txn = NULL;
1132
    toku_txnid2txn(renv->logger, l->xid, &txn);
1133 1134 1135
    assert(txn!=NULL);
    // just make an entry in the rollback log 
    //   - set do_log = 0 -> don't write to recovery log
1136
    toku_ft_hot_index_recovery(txn, l->hot_index_filenums, 0, 0, (LSN*)NULL);
1137 1138 1139 1140 1141 1142 1143 1144 1145
    return 0;
}

// #2954
static int toku_recover_backward_hot_index(struct logtype_hot_index *UU(l), RECOVER_ENV UU(renv)) {
    // nothing
    return 0;
}

1146 1147
// Effects: If there are no log files, or if there is a clean "shutdown" at
// the end of the log, then we don't need recovery to run.
Yoni Fogel's avatar
Yoni Fogel committed
1148 1149
// Returns: true if we need recovery, otherwise false.
int tokudb_needs_recovery(const char *log_dir, bool ignore_log_empty) {
1150
    int needs_recovery;
1151
    int r;
1152
    TOKULOGCURSOR logcursor = NULL;
1153

1154 1155
    r = toku_logcursor_create(&logcursor, log_dir);
    if (r != 0) {
Yoni Fogel's avatar
Yoni Fogel committed
1156
        needs_recovery = true; goto exit;
1157 1158
    }
    
1159 1160
    struct log_entry *le;
    le = NULL;
1161
    r = toku_logcursor_last(logcursor, &le);
1162 1163
    if (r == 0) {
        needs_recovery = le->cmd != LT_shutdown;
1164
    }
1165 1166
    else {
        needs_recovery = !(r == DB_NOTFOUND && ignore_log_empty);
1167 1168 1169 1170 1171
    }
 exit:
    if (logcursor) {
        r = toku_logcursor_destroy(&logcursor);
        assert(r == 0);
1172
    }
1173 1174 1175
    return needs_recovery;
}

1176
static uint32_t recover_get_num_live_txns(RECOVER_ENV renv) {
1177
    return toku_txn_manager_num_live_root_txns(renv->logger->txn_manager);
1178 1179
}

1180 1181
static int is_txn_unprepared(TOKUTXN txn, void* extra) {
    TOKUTXN* ptxn = (TOKUTXN *)extra;
1182
    if (txn->state != TOKUTXN_PREPARING) {
1183
        *ptxn = txn;
1184 1185 1186 1187 1188 1189
        return -1; // return -1 to get iterator to return
    }
    return 0;
}


1190
static int find_an_unprepared_txn (RECOVER_ENV renv, TOKUTXN *txnp) {
1191
    TOKUTXN txn = nullptr;
1192
    int r = toku_txn_manager_iter_over_live_root_txns(
1193
        renv->logger->txn_manager,
1194
        is_txn_unprepared,
1195
        &txn
1196 1197
        );
    assert(r == 0 || r == -1);
1198
    if (txn != nullptr) {
1199 1200
        *txnp = txn;
        return 0;
1201 1202 1203 1204
    }
    return DB_NOTFOUND;
}

1205 1206 1207 1208
static int call_prepare_txn_callback_iter(TOKUTXN txn, void* extra) {
    RECOVER_ENV* renv = (RECOVER_ENV *)extra;
    invariant(txn->state == TOKUTXN_PREPARING);
    invariant(txn->child == NULL);
1209
    (*renv)->prepared_txn_callback((*renv)->env, txn);
1210 1211 1212
    return 0;
}

1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227
static void recover_abort_live_txn(TOKUTXN txn) {
    // recursively abort all children first
    if (txn->child != NULL) {
        recover_abort_live_txn(txn->child);
    }
    // sanity check that the recursive call successfully NULLs out txn->child
    invariant(txn->child == NULL);
    // abort the transaction
    int r = toku_txn_abort_txn(txn, NULL, NULL);
    assert(r == 0);
    
    // close the transaction
    toku_txn_close_txn(txn);
}

1228
// abort all of the remaining live transactions in descending transaction id order
1229
static void recover_abort_all_live_txns(RECOVER_ENV renv) {
1230
    while (1) {
1231 1232 1233
        TOKUTXN txn;
        int r = find_an_unprepared_txn(renv, &txn);
        if (r==0) {
1234
            recover_abort_live_txn(txn);
1235 1236 1237 1238 1239
        } else if (r==DB_NOTFOUND) {
            break;
        } else {
            abort();
        }
1240
    }
1241

1242
    // Now we have only prepared txns.  These prepared txns don't have full DB_TXNs in them, so we need to make some.
1243
    int r = toku_txn_manager_iter_over_live_root_txns(
1244
        renv->logger->txn_manager,
1245
        call_prepare_txn_callback_iter,
1246
        &renv
1247 1248
        );
    assert_zero(r);
1249 1250
}

1251 1252 1253
static void recover_trace_le(const char *f, int l, int r, struct log_entry *le) {
    if (le) {
        LSN thislsn = toku_log_entry_get_lsn(le);
1254
        fprintf(stderr, "%s:%d r=%d cmd=%c lsn=%" PRIu64 "\n", f, l, r, le->cmd, thislsn.lsn);
1255 1256 1257 1258
    } else
        fprintf(stderr, "%s:%d r=%d cmd=?\n", f, l, r);
}

1259 1260 1261 1262 1263 1264 1265
// For test purposes only.
static void (*recover_callback_fx)(void*)  = NULL;
static void * recover_callback_args        = NULL;
static void (*recover_callback2_fx)(void*) = NULL;
static void * recover_callback2_args       = NULL;


1266
static int do_recovery(RECOVER_ENV renv, const char *env_dir, const char *log_dir) {
1267
    int r;
1268 1269
    int rr = 0;
    TOKULOGCURSOR logcursor = NULL;
1270
    struct log_entry *le = NULL;
1271 1272
    
    time_t tnow = time(NULL);
1273
    fprintf(stderr, "%.24s Tokudb recovery starting in env %s\n", ctime(&tnow), env_dir);
1274

1275 1276
    char org_wd[1000];
    {
1277 1278
        char *wd=getcwd(org_wd, sizeof(org_wd));
        assert(wd!=0);
1279
    }
1280

1281
    r = toku_logger_open(log_dir, renv->logger);
1282 1283
    assert(r == 0);

1284
    // grab the last LSN so that it can be restored when the log is restarted
1285
    LSN lastlsn = toku_logger_last_lsn(renv->logger);
1286
    LSN thislsn;
1287

1288 1289 1290 1291 1292 1293
    // there must be at least one log entry
    r = toku_logcursor_create(&logcursor, log_dir);
    assert(r == 0);
    
    r = toku_logcursor_last(logcursor, &le);
    if (r != 0) {
1294 1295
        if (tokudb_recovery_trace) 
            fprintf(stderr, "RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
1296 1297 1298 1299 1300 1301
        rr = DB_RUNRECOVERY; goto errorexit;
    }

    r = toku_logcursor_destroy(&logcursor);
    assert(r == 0);

1302 1303 1304
    r = toku_logcursor_create(&logcursor, log_dir);
    assert(r == 0);

1305
    {
1306 1307
        toku_struct_stat buf;
        if (toku_stat(env_dir, &buf)!=0) {
1308
            rr = get_error_errno();
1309
            fprintf(stderr, "%.24s Tokudb recovery error: directory does not exist: %s\n", ctime(&tnow), env_dir);
1310
            goto errorexit;
1311 1312 1313 1314
        } else if (!S_ISDIR(buf.st_mode)) {
            fprintf(stderr, "%.24s Tokudb recovery error: this file is supposed to be a directory, but is not: %s\n", ctime(&tnow), env_dir);
            rr = ENOTDIR; goto errorexit;
        }
1315
    }
1316
    // scan backwards
1317
    scan_state_init(&renv->ss);
1318
    tnow = time(NULL);
1319 1320 1321
    time_t tlast;
    tlast = tnow;
    fprintf(stderr, "%.24s Tokudb recovery scanning backward from %" PRIu64 "\n", ctime(&tnow), lastlsn.lsn);
1322
    for (unsigned i=0; 1; i++) {
1323 1324

        // get the previous log entry (first time gets the last one)
1325
        le = NULL;
1326
        r = toku_logcursor_prev(logcursor, &le);
1327 1328
        if (tokudb_recovery_trace) 
            recover_trace_le(__FUNCTION__, __LINE__, r, le);
1329
        if (r != 0) {
1330 1331 1332 1333
            if (r == DB_NOTFOUND)
                break;
            rr = DB_RUNRECOVERY; 
            goto errorexit;
1334
        }
1335 1336

        // trace progress
1337 1338 1339 1340
        if ((i % 1000) == 0) {
            tnow = time(NULL);
            if (tnow - tlast >= TOKUDB_RECOVERY_PROGRESS_TIME) {
                thislsn = toku_log_entry_get_lsn(le);
1341
                fprintf(stderr, "%.24s Tokudb recovery scanning backward from %" PRIu64 " at %" PRIu64 " (%s)\n", ctime(&tnow), lastlsn.lsn, thislsn.lsn, recover_state(renv));
1342 1343 1344
                tlast = tnow;
            }
        }
1345 1346

        // dispatch the log entry handler
1347
        assert(renv->ss.ss == BACKWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
1348
               renv->ss.ss == BACKWARD_NEWER_CHECKPOINT_END);
1349
        logtype_dispatch_assign(le, toku_recover_backward_, r, renv);
1350 1351
        if (tokudb_recovery_trace) 
            recover_trace_le(__FUNCTION__, __LINE__, r, le);
1352
        if (r != 0) {
1353 1354
            if (tokudb_recovery_trace) 
                fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
1355 1356
            rr = DB_RUNRECOVERY; 
            goto errorexit;
1357
        }
1358
        if (renv->goforward)
1359
            break;
1360
    }
1361

1362 1363 1364 1365
    // run first callback
    if (recover_callback_fx) 
        recover_callback_fx(recover_callback_args);

1366
    // scan forwards
1367 1368
    assert(le);
    thislsn = toku_log_entry_get_lsn(le);
1369
    tnow = time(NULL);
1370
    fprintf(stderr, "%.24s Tokudb recovery starts scanning forward to %" PRIu64 " from %" PRIu64 " left %" PRIu64 " (%s)\n", ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv));
1371

1372
    for (unsigned i=0; 1; i++) {
1373 1374

        // trace progress
1375 1376 1377 1378
        if ((i % 1000) == 0) {
            tnow = time(NULL);
            if (tnow - tlast >= TOKUDB_RECOVERY_PROGRESS_TIME) {
                thislsn = toku_log_entry_get_lsn(le);
1379
                fprintf(stderr, "%.24s Tokudb recovery scanning forward to %" PRIu64 " at %" PRIu64 " left %" PRIu64 " (%s)\n", ctime(&tnow), lastlsn.lsn, thislsn.lsn, lastlsn.lsn - thislsn.lsn, recover_state(renv));
1380 1381 1382
                tlast = tnow;
            }
        }
1383 1384

        // dispatch the log entry handler (first time calls the forward handler for the log entry at the turnaround
1385
        assert(renv->ss.ss == FORWARD_BETWEEN_CHECKPOINT_BEGIN_END ||
1386
               renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);
1387
        logtype_dispatch_assign(le, toku_recover_, r, renv);
1388 1389
        if (tokudb_recovery_trace) 
            recover_trace_le(__FUNCTION__, __LINE__, r, le);
1390
        if (r != 0) {
1391 1392
            if (tokudb_recovery_trace) 
                fprintf(stderr, "DB_RUNRECOVERY: %s:%d r=%d\n", __FUNCTION__, __LINE__, r);
1393 1394
            rr = DB_RUNRECOVERY; 
            goto errorexit;
1395
        }
1396 1397 1398 1399 1400 1401 1402

        // get the next log entry
        le = NULL;
        r = toku_logcursor_next(logcursor, &le);
        if (tokudb_recovery_trace) 
            recover_trace_le(__FUNCTION__, __LINE__, r, le);
        if (r != 0) {
1403 1404 1405 1406
            if (r == DB_NOTFOUND)
                break;
            rr = DB_RUNRECOVERY; 
            goto errorexit;
1407
        }        
1408
    }
1409

1410 1411 1412
    // verify the final recovery state
    assert(renv->ss.ss == FORWARD_NEWER_CHECKPOINT_END);   

1413 1414
    r = toku_logcursor_destroy(&logcursor);
    assert(r == 0);
1415

1416 1417 1418 1419
    // run second callback
    if (recover_callback2_fx) 
        recover_callback2_fx(recover_callback2_args);

1420
    // restart logging
1421
    toku_logger_restart(renv->logger, lastlsn);
1422

1423
    // abort the live transactions
1424
    {
1425 1426 1427
        uint32_t n = recover_get_num_live_txns(renv);
        if (n > 0) {
            tnow = time(NULL);
1428
            fprintf(stderr, "%.24s Tokudb recovery has %" PRIu32 " live transaction%s\n", ctime(&tnow), n, n > 1 ? "s" : "");
1429
        }
1430
    }
1431
    recover_abort_all_live_txns(renv);
1432
    {
1433 1434 1435
        uint32_t n = recover_get_num_live_txns(renv);
        if (n > 0) {
            tnow = time(NULL);
1436
            fprintf(stderr, "%.24s Tokudb recovery has %" PRIu32 " prepared transaction%s\n", ctime(&tnow), n, n > 1 ? "s" : "");
1437
        }
1438
    }
1439 1440

    // close the open dictionaries
1441 1442
    uint32_t n;
    n = file_map_get_num_dictionaries(&renv->fmap);
1443 1444
    if (n > 0) {
        tnow = time(NULL);
1445
        fprintf(stderr, "%.24s Tokudb recovery closing %" PRIu32 " dictionar%s\n", ctime(&tnow), n, n > 1 ? "ies" : "y");
1446
    }
1447
    file_map_close_dictionaries(&renv->fmap, lastlsn);
1448

1449 1450
    {
        // write a recovery log entry
1451
        BYTESTRING recover_comment = { static_cast<uint32_t>(strlen("recover")), (char *) "recover" };
1452
        toku_log_comment(renv->logger, NULL, true, 0, recover_comment);
1453
    }
1454 1455

    // checkpoint 
1456 1457
    tnow = time(NULL);
    fprintf(stderr, "%.24s Tokudb recovery making a checkpoint\n", ctime(&tnow));
Zardosht Kasheff's avatar
Zardosht Kasheff committed
1458
    r = toku_checkpoint(renv->cp, renv->logger, NULL, NULL, NULL, NULL, RECOVERY_CHECKPOINT);
1459
    assert(r == 0);
1460 1461
    tnow = time(NULL);
    fprintf(stderr, "%.24s Tokudb recovery done\n", ctime(&tnow));
1462 1463

    return 0;
1464 1465

 errorexit:
1466 1467
    tnow = time(NULL);
    fprintf(stderr, "%.24s Tokudb recovery failed %d\n", ctime(&tnow), rr);
1468

1469 1470 1471 1472 1473 1474
    if (logcursor) {
        r = toku_logcursor_destroy(&logcursor);
        assert(r == 0);
    }

    return rr;
1475 1476
}

1477 1478
int
toku_recover_lock(const char *lock_dir, int *lockfd) {
1479 1480
    if (!lock_dir)
        return ENOENT;
1481
    int namelen=strlen(lock_dir);
1482
    char lockfname[namelen+sizeof(recovery_lock_file)];
1483

1484
    int l = snprintf(lockfname, sizeof(lockfname), "%s%s", lock_dir, recovery_lock_file);
1485 1486 1487
    assert(l+1 == (signed)(sizeof(lockfname)));
    *lockfd = toku_os_lock_file(lockfname);
    if (*lockfd < 0) {
1488
        int e = get_error_errno();
1489
        fprintf(stderr, "Couldn't run recovery because some other process holds the recovery lock %s\n", lockfname);
1490 1491 1492 1493 1494
        return e;
    }
    return 0;
}

1495 1496
int
toku_recover_unlock(int lockfd) {
1497
    int r = toku_os_unlock_file(lockfd);
1498 1499 1500
    if (r != 0) {
        return get_error_errno();
    }
1501 1502 1503
    return 0;
}

1504 1505


1506
int tokudb_recover(DB_ENV *env,
1507 1508 1509 1510
                   prepared_txn_callback_t    prepared_txn_callback,                   
                   keep_cachetable_callback_t keep_cachetable_callback,
                   TOKULOGGER logger,
                   const char *env_dir, const char *log_dir,
1511 1512
                   ft_compare_func bt_compare,
                   ft_update_func update_function,
1513 1514
                   generate_row_for_put_func generate_row_for_put,
                   generate_row_for_del_func generate_row_for_del,
1515
                   size_t cachetable_size) {
1516
    int r;
1517
    int lockfd = -1;
1518

1519
    r = toku_recover_lock(log_dir, &lockfd);
1520 1521
    if (r != 0)
        return r;
1522 1523

    int rr = 0;
Yoni Fogel's avatar
Yoni Fogel committed
1524
    if (tokudb_needs_recovery(log_dir, false)) {
1525
        struct recover_env renv;
1526
        r = recover_env_init(&renv,
1527 1528 1529 1530 1531 1532
                             env_dir,
                             env,
                             prepared_txn_callback,
                             keep_cachetable_callback,
                             logger,
                             bt_compare,
1533
                             update_function,
1534 1535
                             generate_row_for_put,
                             generate_row_for_del,
1536
                             cachetable_size);
1537 1538
        assert(r == 0);

1539
        rr = do_recovery(&renv, env_dir, log_dir);
1540

1541
        recover_env_cleanup(&renv);
1542
    }
1543

1544
    r = toku_recover_unlock(lockfd);
1545 1546
    if (r != 0)
        return r;
1547

1548
    return rr;
1549
}
1550 1551 1552 1553 1554 1555 1556 1557 1558

// Return 0 if recovery log exists, ENOENT if log is missing
int 
tokudb_recover_log_exists(const char * log_dir) {
    int r;
    TOKULOGCURSOR logcursor;

    r = toku_logcursor_create(&logcursor, log_dir);
    if (r == 0) {
1559 1560 1561 1562
        int rclose;
        r = toku_logcursor_log_exists(logcursor);  // return ENOENT if no log
        rclose = toku_logcursor_destroy(&logcursor);
        assert(rclose == 0);
1563 1564
    }
    else
1565
        r = ENOENT;
1566 1567 1568
    
    return r;
}
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578

void toku_recover_set_callback (void (*callback_fx)(void*), void* callback_args) {
    recover_callback_fx   = callback_fx;
    recover_callback_args = callback_args;
}

void toku_recover_set_callback2 (void (*callback_fx)(void*), void* callback_args) {
    recover_callback2_fx   = callback_fx;
    recover_callback2_args = callback_args;
}