rollback-apply.cc 9.44 KB
Newer Older
1 2
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc.  All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."

#include "fttypes.h"
#include "log-internal.h"
#include "rollback-apply.h"

static void
poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
    if (txn->progress_poll_fun) {
        TOKU_TXN_PROGRESS_S progress = {
            .entries_total     = txn->roll_info.num_rollentries,
            .entries_processed = txn->roll_info.num_rollentries_processed,
            .is_commit = is_commit,
            .stalled_on_checkpoint = stall_for_checkpoint};
        txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra);
    }
}

int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
    int r=0;
    rolltype_dispatch_assign(item, toku_commit_, r, txn, lsn);
    txn->roll_info.num_rollentries_processed++;
    if (txn->roll_info.num_rollentries_processed % 1024 == 0) {
Yoni Fogel's avatar
Yoni Fogel committed
28
        poll_txn_progress_function(txn, true, false);
29 30 31 32 33 34 35 36 37
    }
    return r;
}

int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, LSN lsn) {
    int r=0;
    rolltype_dispatch_assign(item, toku_rollback_, r, txn, lsn);
    txn->roll_info.num_rollentries_processed++;
    if (txn->roll_info.num_rollentries_processed % 1024 == 0) {
Yoni Fogel's avatar
Yoni Fogel committed
38
        poll_txn_progress_function(txn, false, false);
39 40 41 42
    }
    return r;
}

43 44 45
int
note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child);
int
46
note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child) {
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
    TOKUTXN parent = child->parent;
    toku_txn_maybe_note_ft(parent, ft);
    if (ft->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) {
        //Pass magic "no rollback needed" flag to parent.
        ft->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent);
    }
    if (ft->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) {
        //Pass magic "no recovery needed" flag to parent.
        ft->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent);
    }
    return 0;
}

static int
apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
    int r = 0;
    // do the commit/abort calls and free everything
    // we do the commit/abort calls in reverse order too.
    struct roll_entry *item;
    //printf("%s:%d abort\n", __FILE__, __LINE__);

    BLOCKNUM next_log      = ROLLBACK_NONE;
    uint32_t next_log_hash = 0;

Yoni Fogel's avatar
Yoni Fogel committed
71
    bool is_current = false;
72 73 74
    if (txn_has_current_rollback_log(txn)) {
        next_log      = txn->roll_info.current_rollback;
        next_log_hash = txn->roll_info.current_rollback_hash;
Yoni Fogel's avatar
Yoni Fogel committed
75
        is_current = true;
76 77 78 79 80 81 82
    }
    else if (txn_has_spilled_rollback_logs(txn)) {
        next_log      = txn->roll_info.spilled_rollback_tail;
        next_log_hash = txn->roll_info.spilled_rollback_tail_hash;
    }

    uint64_t last_sequence = txn->roll_info.num_rollback_nodes;
Yoni Fogel's avatar
Yoni Fogel committed
83
    bool found_head = false;
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
    while (next_log.b != ROLLBACK_NONE.b) {
        ROLLBACK_LOG_NODE log;
        //pin log
        toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
        toku_rollback_verify_contents(log, txn->txnid64, last_sequence - 1);

        toku_maybe_prefetch_previous_rollback_log(txn, log);

        last_sequence = log->sequence;
        if (func) {
            while ((item=log->newest_logentry)) {
                log->newest_logentry = item->prev;
                r = func(txn, item, lsn);
                if (r!=0) return r;
            }
        }
        if (next_log.b == txn->roll_info.spilled_rollback_head.b) {
            assert(!found_head);
Yoni Fogel's avatar
Yoni Fogel committed
102
            found_head = true;
103 104 105 106 107 108 109 110 111 112
            assert(log->sequence == 0);
        }
        next_log      = log->previous;
        next_log_hash = log->previous_hash;
        {
            //Clean up transaction structure to prevent
            //toku_txn_close from double-freeing
            if (is_current) {
                txn->roll_info.current_rollback      = ROLLBACK_NONE;
                txn->roll_info.current_rollback_hash = 0;
Yoni Fogel's avatar
Yoni Fogel committed
113
                is_current = false;
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
            }
            else {
                txn->roll_info.spilled_rollback_tail      = next_log;
                txn->roll_info.spilled_rollback_tail_hash = next_log_hash;
            }
            if (found_head) {
                assert(next_log.b == ROLLBACK_NONE.b);
                txn->roll_info.spilled_rollback_head      = next_log;
                txn->roll_info.spilled_rollback_head_hash = next_log_hash;
            }
        }
        toku_rollback_log_unpin_and_remove(txn, log);
    }
    return r;
}

//Commit each entry in the rollback log.
//If the transaction has a parent, it just promotes its information to its parent.
int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
    int r=0;
    if (txn->parent!=0) {
        // First we must put a rollinclude entry into the parent if we spilled

        if (txn_has_spilled_rollback_logs(txn)) {
            uint64_t num_nodes = txn->roll_info.num_rollback_nodes;
            if (txn_has_current_rollback_log(txn)) {
                num_nodes--; //Don't count the in-progress rollback log.
            }
            r = toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid64, num_nodes,
                                                      txn->roll_info.spilled_rollback_head, txn->roll_info.spilled_rollback_head_hash,
                                                      txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash);
            if (r!=0) return r;
            //Remove ownership from child.
            txn->roll_info.spilled_rollback_head      = ROLLBACK_NONE; 
            txn->roll_info.spilled_rollback_head_hash = 0; 
            txn->roll_info.spilled_rollback_tail      = ROLLBACK_NONE; 
            txn->roll_info.spilled_rollback_tail_hash = 0; 
        }
        // if we're commiting a child rollback, put its entries into the parent
        // by pinning both child and parent and then linking the child log entry
        // list to the end of the parent log entry list.
        if (txn_has_current_rollback_log(txn)) {
            //Pin parent log
            ROLLBACK_LOG_NODE parent_log;
            toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);

            //Pin child log
            ROLLBACK_LOG_NODE child_log;
            toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, 
                    txn->roll_info.current_rollback_hash, &child_log);
            toku_rollback_verify_contents(child_log, txn->txnid64, txn->roll_info.num_rollback_nodes - 1);

            // Append the list to the front of the parent.
            if (child_log->oldest_logentry) {
                // There are some entries, so link them in.
                child_log->oldest_logentry->prev = parent_log->newest_logentry;
                if (!parent_log->oldest_logentry) {
                    parent_log->oldest_logentry = child_log->oldest_logentry;
                }
                parent_log->newest_logentry = child_log->newest_logentry;
                parent_log->rollentry_resident_bytecount += child_log->rollentry_resident_bytecount;
                txn->parent->roll_info.rollentry_raw_count         += txn->roll_info.rollentry_raw_count;
                child_log->rollentry_resident_bytecount = 0;
            }
            if (parent_log->oldest_logentry==NULL) {
                parent_log->oldest_logentry = child_log->oldest_logentry;
            }
            child_log->newest_logentry = child_log->oldest_logentry = 0;
            // Put all the memarena data into the parent.
            if (memarena_total_size_in_use(child_log->rollentry_arena) > 0) {
                // If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
                memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
            }
            toku_rollback_log_unpin_and_remove(txn, child_log);
            txn->roll_info.current_rollback = ROLLBACK_NONE;
            txn->roll_info.current_rollback_hash = 0;

            toku_maybe_spill_rollbacks(txn->parent, parent_log);
            toku_rollback_log_unpin(txn->parent, parent_log);
            assert(r == 0);
        }

        // Note the open brts, the omts must be merged
197
        r = txn->open_fts.iterate<struct tokutxn, note_ft_used_in_txns_parent>(txn);
198 199 200 201
        assert(r==0);

        // Merge the list of headers that must be checkpointed before commit
        if (txn->checkpoint_needed_before_commit) {
Yoni Fogel's avatar
Yoni Fogel committed
202
            txn->parent->checkpoint_needed_before_commit = true;
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
        }

        //If this transaction needs an fsync (if it commits)
        //save that in the parent.  Since the commit really happens in the root txn.
        txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
        txn->parent->roll_info.num_rollentries       += txn->roll_info.num_rollentries;
    } else {
        r = apply_txn(txn, lsn, toku_commit_rollback_item);
        assert(r==0);
    }

    return r;
}

int toku_rollback_abort(TOKUTXN txn, LSN lsn) {
    int r;
    r = apply_txn(txn, lsn, toku_abort_rollback_item);
    assert(r==0);
    return r;
}