txn.c 36.9 KB
Newer Older
1
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
// vim: expandtab:ts=8:sw=4:softtabstop=4:
3
#ident "$Id$"
4
#ident "Copyright (c) 2007-2010 Tokutek Inc.  All rights reserved."
5 6
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."

7

8 9
#include "includes.h"
#include "txn.h"
10
#include "checkpoint.h"
11
#include "ule.h"
12
#include <valgrind/helgrind.h>
Zardosht Kasheff's avatar
Zardosht Kasheff committed
13 14 15

BOOL garbage_collection_debug = FALSE;

16
static void verify_snapshot_system(TOKULOGGER logger);
17

18 19 20 21 22
///////////////////////////////////////////////////////////////////////////////////
// Engine status
//
// Status is intended for display to humans to help understand system behavior.
// It does not need to be perfectly thread-safe.
23

24 25 26
static TXN_STATUS_S txn_status;

#define STATUS_INIT(k,t,l) { \
27 28 29
    txn_status.status[k].keyname = #k; \
    txn_status.status[k].type    = t;  \
    txn_status.status[k].legend  = "txn: " l; \
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    }

static void
status_init(void) {
    // Note, this function initializes the keyname, type, and legend fields.
    // Value fields are initialized to zero by compiler.
    STATUS_INIT(TXN_BEGIN,            UINT64,   "begin");
    STATUS_INIT(TXN_COMMIT,           UINT64,   "successful commits");
    STATUS_INIT(TXN_ABORT,            UINT64,   "aborts");
    STATUS_INIT(TXN_CLOSE,            UINT64,   "close (should be sum of aborts and commits)");
    STATUS_INIT(TXN_NUM_OPEN,         UINT64,   "number currently open (should be begin - close)");
    STATUS_INIT(TXN_MAX_OPEN,         UINT64,   "max number open simultaneously");
    STATUS_INIT(TXN_OLDEST_LIVE,      UINT64,   "xid of oldest live transaction");
    STATUS_INIT(TXN_OLDEST_STARTTIME, UNIXTIME, "start time of oldest live transaction");
    txn_status.initialized = true;
}
#undef STATUS_INIT

#define STATUS_VALUE(x) txn_status.status[x].value.num
49 50

void 
51 52
toku_txn_get_status(TOKULOGGER logger, TXN_STATUS s) {
    if (!txn_status.initialized)
53
        status_init();
54 55 56 57 58 59
    {
        time_t oldest_starttime;
        STATUS_VALUE(TXN_OLDEST_LIVE) = toku_logger_get_oldest_living_xid(logger, &oldest_starttime);
        STATUS_VALUE(TXN_OLDEST_STARTTIME) = (uint64_t) oldest_starttime;
    }
    *s = txn_status;
60 61
}

62 63 64
int 
toku_txn_begin_txn (
    DB_TXN  *container_db_txn,
65
    TOKUTXN parent_tokutxn, 
66
    TOKUTXN *tokutxn,
67 68 69 70
    TOKULOGGER logger, 
    TXN_SNAPSHOT_TYPE snapshot_type
    ) 
{
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
    int r = toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, TXNID_NONE, snapshot_type, container_db_txn);
    return r;
}

int 
toku_txn_begin_with_xid (
    TOKUTXN parent_tokutxn, 
    TOKUTXN *tokutxn, 
    TOKULOGGER logger, 
    TXNID xid, 
    TXN_SNAPSHOT_TYPE snapshot_type,
    DB_TXN *container_db_txn
    ) 
{
    int r = toku_txn_create_txn(tokutxn, parent_tokutxn, logger, xid, snapshot_type, container_db_txn);
    if (r == 0)
        r = toku_txn_start_txn(*tokutxn);
88 89 90 91 92 93 94
    return r;
}

DB_TXN *
toku_txn_get_container_db_txn (TOKUTXN tokutxn) {
    DB_TXN * container = tokutxn->container_db_txn;
    return container;
95 96
}

97 98 99 100
void toku_txn_set_container_db_txn (TOKUTXN tokutxn, DB_TXN*container) {
    tokutxn->container_db_txn = container;
}

Barry Perlman's avatar
Barry Perlman committed
101
// Create list of root transactions that were live when this txn began.
102 103 104
static int
setup_live_root_txn_list(TOKUTXN txn) {
    OMT global = txn->logger->live_root_txns;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
105 106 107 108
    int r = toku_omt_clone_noptr(
        &txn->live_root_txn_list,
        global
        );
109 110 111
    return r;
}

Barry Perlman's avatar
Barry Perlman committed
112 113 114
// Add this txn to the global list of txns that have their own snapshots.
// (Note, if a txn is a child that creates its own snapshot, then that child xid
// is the xid stored in the global list.) 
115 116 117 118
static int
snapshot_txnids_note_txn(TOKUTXN txn) {
    int r;
    OMT txnids = txn->logger->snapshot_txnids;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
119
    r = toku_omt_insert_at(txnids, (OMTVALUE) txn->txnid64, toku_omt_size(txnids));
120
    assert_zero(r);
121 122 123
    return r;
}

Barry Perlman's avatar
Barry Perlman committed
124 125
// If live txn is not in reverse live list, then add it.
// If live txn is in reverse live list, update it by setting second xid in pair to new txn that is being started.
126 127 128
static int
live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
    TOKUTXN txn = txnv;
Barry Perlman's avatar
Barry Perlman committed
129
    TXNID xid   = txn->txnid64;     // xid of new txn that is being started
Zardosht Kasheff's avatar
Zardosht Kasheff committed
130
    TXNID live_xid = (TXNID)live_xidv;    // xid on the new txn's live list
131 132 133 134 135 136
    OMTVALUE pairv;
    XID_PAIR pair;
    uint32_t idx;

    int r;
    OMT reverse = txn->logger->live_list_reverse;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
137
    r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, (void *)live_xid, &pairv, &idx);
138 139
    if (r==0) {
        pair = pairv;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
140
        invariant(pair->xid1 == live_xid); //sanity check
141 142 143 144 145 146 147
        invariant(pair->xid2 < xid);        //Must be older
        pair->xid2 = txn->txnid64;
    }
    else {
        invariant(r==DB_NOTFOUND);
        //Make new entry
        XMALLOC(pair);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
148
        pair->xid1 = live_xid;
149 150
        pair->xid2 = txn->txnid64;
        r = toku_omt_insert_at(reverse, pair, idx);
151
        assert_zero(r);
152 153 154 155
    }
    return r;
}

Barry Perlman's avatar
Barry Perlman committed
156 157 158 159 160 161 162
// Maintain the reverse live list.  The reverse live list is a list of xid pairs.  The first xid in the pair
// is a txn that was live when some txn began, and the second xid in the pair is the newest still-live xid to 
// have that first xid in its live list.  (The first xid may be closed, it only needed to be live when the 
// second txn began.)
// When a new txn begins, we need to scan the live list of this new txn.  For each live txn, we either 
// add it to the reverse live list (if it is not already there), or update to the reverse live list so
// that this new txn is the second xid in the pair associated with the txn in the live list.
163 164 165 166 167
static int
live_list_reverse_note_txn_start(TOKUTXN txn) {
    int r;

    r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_start_iter, txn);
168
    assert_zero(r);
169 170 171
    return r;
}

172
static void invalidate_xa_xid (TOKU_XA_XID *xid) {
173 174 175 176
    ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind
    xid->formatID = -1; // According to the XA spec, -1 means "invalid data"
}

177 178
int 
toku_txn_create_txn (
179
    TOKUTXN *tokutxn, 
180
    TOKUTXN parent_tokutxn, 
181 182
    TOKULOGGER logger, 
    TXNID xid, 
183 184
    TXN_SNAPSHOT_TYPE snapshot_type,
    DB_TXN *container_db_txn
185 186
    ) 
{
187
    if (logger->is_panicked) return EINVAL;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
188 189 190
    if (garbage_collection_debug) {
        verify_snapshot_system(logger);
    }
191
    assert(logger->rollback_cachefile);
192
    TOKUTXN XMALLOC(result);
193
    result->starttime = time(NULL);  // getting timestamp in seconds is a cheap call
194
    int r;
195
    r = toku_omt_create(&result->open_fts);
196
    assert_zero(r);
197

198 199
    result->logger = logger;
    result->parent = parent_tokutxn;
200 201 202 203
    result->num_rollentries = 0;
    result->num_rollentries_processed = 0;
    result->progress_poll_fun = NULL;
    result->progress_poll_fun_extra = NULL;
204 205 206 207 208 209 210
    result->spilled_rollback_head      = ROLLBACK_NONE;
    result->spilled_rollback_tail      = ROLLBACK_NONE;
    result->spilled_rollback_head_hash = 0;
    result->spilled_rollback_tail_hash = 0;
    result->current_rollback      = ROLLBACK_NONE;
    result->current_rollback_hash = 0;
    result->num_rollback_nodes = 0;
211 212
    result->snapshot_type = snapshot_type;
    result->snapshot_txnid64 = TXNID_NONE;
213 214 215 216 217
    result->container_db_txn = container_db_txn;

    result->rollentry_raw_count = 0;
    result->force_fsync_on_commit = FALSE;
    result->recovered_from_checkpoint = FALSE;
218
    result->checkpoint_needed_before_commit = FALSE;
219
    result->state = TOKUTXN_LIVE;
220
    invalidate_xa_xid(&result->xa_xid);
221 222 223 224 225 226 227 228 229 230 231 232
    result->do_fsync = FALSE;

    toku_txn_ignore_init(result); // 2954

    result->txnid64 = xid;
    result->xids = NULL;

    *tokutxn = result;

    STATUS_VALUE(TXN_BEGIN)++;
    STATUS_VALUE(TXN_NUM_OPEN)++;
    if (STATUS_VALUE(TXN_NUM_OPEN) > STATUS_VALUE(TXN_MAX_OPEN))
233
        STATUS_VALUE(TXN_MAX_OPEN) = STATUS_VALUE(TXN_NUM_OPEN);
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248

    if (garbage_collection_debug) {
        verify_snapshot_system(logger);
    }
    return 0;
}

int
toku_txn_start_txn(TOKUTXN txn) {
    TOKULOGGER logger = txn->logger;
    TOKUTXN parent = txn->parent;
    int r;
    if (txn->txnid64 == TXNID_NONE) {
        LSN first_lsn;
        r = toku_log_xbegin(logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
249
        assert_zero(r);
250 251 252 253 254 255 256
        txn->txnid64 = first_lsn.lsn;
    } 
    XIDS parent_xids;
    if (parent == NULL)
        parent_xids = xids_get_root_xids();
    else
        parent_xids = parent->xids;
257 258
    r = xids_create_child(parent_xids, &txn->xids, txn->txnid64);
    assert_zero(r);
259 260 261

    if (toku_omt_size(logger->live_txns) == 0) {
        assert(logger->oldest_living_xid == TXNID_NONE_LIVING);
262 263
        logger->oldest_living_xid = txn->txnid64;
        logger->oldest_living_starttime = txn->starttime;
264
    }
265
    assert(logger->oldest_living_xid <= txn->txnid64);
266

267
    toku_mutex_lock(&logger->txn_list_lock);
268 269
    {
        //Add txn to list (omt) of live transactions
270
        //We know it is the newest one.
271
        r = toku_omt_insert_at(logger->live_txns, txn, toku_omt_size(logger->live_txns));
272
        assert_zero(r);
273

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
        //
        // maintain the data structures necessary for MVCC:
        //  1. add txn to list of live_root_txns if this is a root transaction
        //  2. if the transaction is creating a snapshot:
        //    - create a live list for the transaction
        //    - add the id to the list of snapshot ids
        //    - make the necessary modifications to the live_list_reverse
        //
        // The order of operations is important here, and must be taken
        // into account when the transaction is closed. The txn is added
        // to the live_root_txns first (if it is a root txn). This has the implication
        // that a root level snapshot transaction is in its own live list. This fact
        // is taken into account when the transaction is closed.
        //

289
        // add ancestor information, and maintain global live root txn list
290
        if (parent == NULL) {
291
            //Add txn to list (omt) of live root txns
Zardosht Kasheff's avatar
Zardosht Kasheff committed
292
            r = toku_omt_insert_at(logger->live_root_txns, (OMTVALUE) txn->txnid64, toku_omt_size(logger->live_root_txns)); //We know it is the newest one.
293
            assert_zero(r);
294
            txn->ancestor_txnid64 = txn->txnid64;
295 296
        }
        else {
297
            txn->ancestor_txnid64 = parent->ancestor_txnid64;
298 299 300
        }

        // setup information for snapshot reads
301
        if (txn->snapshot_type != TXN_SNAPSHOT_NONE) {
302 303
            // in this case, either this is a root level transaction that needs its live list setup, or it
            // is a child transaction that specifically asked for its own snapshot
304 305
            if (parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) {
                r = setup_live_root_txn_list(txn);  
306
                assert_zero(r);
307 308
                txn->snapshot_txnid64 = txn->txnid64;
                r = snapshot_txnids_note_txn(txn);
309
                assert_zero(r);
310
                r = live_list_reverse_note_txn_start(txn);
311
                assert_zero(r);
312 313 314
            }
            // in this case, it is a child transaction that specified its snapshot to be that 
            // of the root transaction
315 316 317
            else if (txn->snapshot_type == TXN_SNAPSHOT_ROOT) {
                txn->live_root_txn_list = parent->live_root_txn_list;
                txn->snapshot_txnid64 = parent->snapshot_txnid64;
318 319 320 321 322
            }
            else {
                assert(FALSE);
            }
        }
323
    }
324
    toku_mutex_unlock(&logger->txn_list_lock);
325 326 327
    return 0;
}

328 329 330 331 332 333
//Used on recovery to recover a transaction.
int
toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
#define COPY_FROM_INFO(field) txn->field = info->field
    COPY_FROM_INFO(rollentry_raw_count);
    uint32_t i;
334 335 336
    for (i = 0; i < info->num_fts; i++) {
        FT h = info->open_fts[i];
        int r = toku_txn_note_ft(txn, h);
337
        assert_zero(r);
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
    }
    COPY_FROM_INFO(force_fsync_on_commit );
    COPY_FROM_INFO(num_rollback_nodes);
    COPY_FROM_INFO(num_rollentries);

    CACHEFILE rollback_cachefile = txn->logger->rollback_cachefile;

    COPY_FROM_INFO(spilled_rollback_head);
    txn->spilled_rollback_head_hash = toku_cachetable_hash(rollback_cachefile,
                                                           txn->spilled_rollback_head);
    COPY_FROM_INFO(spilled_rollback_tail);
    txn->spilled_rollback_tail_hash = toku_cachetable_hash(rollback_cachefile,
                                                           txn->spilled_rollback_tail);
    COPY_FROM_INFO(current_rollback);
    txn->current_rollback_hash = toku_cachetable_hash(rollback_cachefile,
                                                      txn->current_rollback);
#undef COPY_FROM_INFO
    txn->recovered_from_checkpoint = TRUE;
    return 0;
}
358

359
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
360
                        TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
361
                        bool release_multi_operation_client_lock)
362 363 364
// Effect: Doesn't close the txn, just performs the commit operations.
//  If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{
365
    return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN,
366 367
                                    poll, poll_extra,
                                    release_multi_operation_client_lock);
368 369
}

370 371 372 373 374 375

void
toku_txn_require_checkpoint_on_commit(TOKUTXN txn) {
    txn->checkpoint_needed_before_commit = TRUE;
}

376 377 378 379 380
struct xcommit_info {
    int r;
    TOKUTXN txn;
};

Zardosht Kasheff's avatar
Zardosht Kasheff committed
381
BOOL toku_txn_requires_checkpoint(TOKUTXN txn) {
382
    return (!txn->parent && txn->checkpoint_needed_before_commit);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
383 384
}

385 386
//Called during a yield (ydb lock NOT held).
static void
387
log_xcommit(void *thunk) {
388 389
    struct xcommit_info *info = thunk;
    TOKUTXN txn = info->txn;
390
    info->r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); // exits holding neither of the tokulogger locks.
391 392
}

393
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
394
                             TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
395
                             bool release_multi_operation_client_lock) 
396 397
// Effect: Among other things: if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{
398
    if (txn->state==TOKUTXN_PREPARING) {
399
        invalidate_xa_xid(&txn->xa_xid);
400 401
        toku_list_remove(&txn->prepared_txns_link);
    }
402
    txn->state = TOKUTXN_COMMITTING;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
403 404 405
    if (garbage_collection_debug) {
        verify_snapshot_system(txn->logger);
    }
406 407
    int r;
    // panic handled in log_commit
408

Zardosht Kasheff's avatar
Zardosht Kasheff committed
409 410 411 412 413 414 415 416 417
    // Child transactions do not actually 'commit'.  They promote their 
    // changes to parent, so no need to fsync if this txn has a parent. The
    // do_sync state is captured in the txn for txn_maybe_fsync_log function
    // Additionally, if the transaction was first prepared, we do not need to 
    // fsync because the prepare caused an fsync of the log. In this case, 
    // we do not need an additional of the log. We rely on the client running 
    // recovery to properly recommit this transaction if the commit 
    // does not make it to disk. In the case of MySQL, that would be the
    // binary log.
418
    txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->num_rollentries>0));
419

420 421 422
    txn->progress_poll_fun = poll;
    txn->progress_poll_fun_extra = poll_extra;

423 424 425 426 427
    {
        struct xcommit_info info = {
            .r = 0,
            .txn = txn,
        };
428
        log_xcommit(&info);
429 430
        r = info.r;
    }
431
    if (r==0) {
432 433
        r = toku_rollback_commit(txn, yield, yieldv, oplsn);
        STATUS_VALUE(TXN_COMMIT)++;
434 435 436
    }
    // Make sure we release that lock (even if there was an error)
    if (release_multi_operation_client_lock) toku_multi_operation_client_unlock();
437 438 439
    return r;
}

440
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv,
441
                       TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
442
                       bool release_multi_operation_client_lock)
443 444 445 446
// Effect: Doesn't close the txn, just performs the abort operations.
// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{
    return toku_txn_abort_with_lsn(txn, yield, yieldv, ZERO_LSN, poll, poll_extra, release_multi_operation_client_lock);
447 448
}

449
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
450
                            TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
451
                            bool release_multi_operation_client_lock)
452 453
// Effect: Ammong other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{
454
    if (txn->state==TOKUTXN_PREPARING) {
455
        invalidate_xa_xid(&txn->xa_xid);
456 457
        toku_list_remove(&txn->prepared_txns_link);
    }
458
    txn->state = TOKUTXN_ABORTING;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
459 460 461
    if (garbage_collection_debug) {
        verify_snapshot_system(txn->logger);
    }
462 463 464 465
    //printf("%s:%d aborting\n", __FILE__, __LINE__);
    // Must undo everything.  Must undo it all in reverse order.
    // Build the reverse list
    //printf("%s:%d abort\n", __FILE__, __LINE__);
466

467 468
    txn->progress_poll_fun = poll;
    txn->progress_poll_fun_extra = poll_extra;
469 470 471
    int r = 0;
    txn->do_fsync = FALSE;
    r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
472
    if (r==0)  {
473 474
        r = toku_rollback_abort(txn, yield, yieldv, oplsn);
        STATUS_VALUE(TXN_ABORT)++;
475 476 477
    }
    // Make sure we multi_operation_client_unlock release will happen even if there is an error
    if (release_multi_operation_client_lock) toku_multi_operation_client_unlock();
478 479 480
    return r;
}

481
static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) {
482 483 484 485 486 487 488
    ANNOTATE_NEW_MEMORY(dest, sizeof(*dest));
    dest->formatID     = source->formatID;
    dest->gtrid_length = source->gtrid_length;
    dest->bqual_length = source->bqual_length;
    memcpy(dest->data, source->data, source->gtrid_length+source->bqual_length);
}

489
int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) {
490
    assert(txn->state==TOKUTXN_LIVE);
491
    txn->state = TOKUTXN_PREPARING; // This state transition must be protected against begin_checkpoint.  Right now it uses the ydb lock.
492 493 494
    if (txn->parent) return 0; // nothing to do if there's a parent.
    // Do we need to do an fsync?
    txn->do_fsync = (txn->force_fsync_on_commit || txn->num_rollentries>0);
495
    copy_xid(&txn->xa_xid, xa_xid);
496
    // This list will go away with #4683, so we wn't need the ydb lock for this anymore.
497
    toku_list_push(&txn->logger->prepared_txns, &txn->prepared_txns_link);
498
    return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, xa_xid);
499 500
}

501
void toku_txn_get_prepared_xa_xid (TOKUTXN txn, TOKU_XA_XID *xid) {
502 503 504
    copy_xid(xid, &txn->xa_xid);
}

505
int toku_logger_get_txn_from_xid (TOKULOGGER logger, TOKU_XA_XID *xid, DB_TXN **txnp) {
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
    int num_live_txns = toku_omt_size(logger->live_txns);
    for (int i = 0; i < num_live_txns; i++) {
        OMTVALUE v;
        {
            int r = toku_omt_fetch(logger->live_txns, i, &v);
            assert_zero(r);
        }
        TOKUTXN txn = v;
        if (txn->xa_xid.formatID     == xid->formatID
            && txn->xa_xid.gtrid_length == xid->gtrid_length
            && txn->xa_xid.bqual_length == xid->bqual_length
            && 0==memcmp(txn->xa_xid.data, xid->data, xid->gtrid_length + xid->bqual_length)) {
            *txnp = txn->container_db_txn;
            return 0;
        }
    }
    return DB_NOTFOUND;
523 524
}

525
int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
526
    if (flags==DB_FIRST) {
527 528 529 530 531 532
        // Anything in the returned list goes back on the prepared list.
        while (!toku_list_empty(&logger->prepared_and_returned_txns)) {
            struct toku_list *h = toku_list_head(&logger->prepared_and_returned_txns);
            toku_list_remove(h);
            toku_list_push(&logger->prepared_txns, h);
        }
533
    } else if (flags!=DB_NEXT) { 
534
        return EINVAL;
535 536 537
    }
    long i;
    for (i=0; i<count; i++) {
538 539 540 541 542 543 544
        if (!toku_list_empty(&logger->prepared_txns)) {
            struct toku_list *h = toku_list_head(&logger->prepared_txns);
            toku_list_remove(h);
            toku_list_push(&logger->prepared_and_returned_txns, h);
            TOKUTXN txn = toku_list_struct(h, struct tokutxn, prepared_txns_link);
            assert(txn->container_db_txn);
            preplist[i].txn = txn->container_db_txn;
545
            preplist[i].xid = txn->xa_xid;
546 547 548
        } else {
            break;
        }
549 550 551 552 553
    }
    *retp = i;
    return 0;
}

554
struct txn_fsync_log_info {
555 556
    TOKULOGGER logger;
    LSN do_fsync_lsn;
557 558 559 560 561
    int r;
};

static void do_txn_fsync_log(void *thunk) {
    struct txn_fsync_log_info *info = (struct txn_fsync_log_info *) thunk;
562
    info->r = toku_logger_fsync_if_lsn_not_fsynced(info->logger, info->do_fsync_lsn);
563 564
}

565
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv) {
566
    int r = 0;
567 568
    if (logger && do_fsync) {
        struct txn_fsync_log_info info = { .logger = logger, .do_fsync_lsn = do_fsync_lsn };
569 570 571 572 573 574
        yield(do_txn_fsync_log, &info, yieldv);
        r = info.r;
    }
    return r;
}

575 576 577 578 579
void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn) {
    *do_fsync = ttxn->do_fsync;
    *do_fsync_lsn = ttxn->do_fsync_lsn;
}

580
void toku_txn_close_txn(TOKUTXN txn) {
581
    toku_txn_complete_txn(txn);
582 583
    toku_txn_destroy_txn(txn);
}
584

585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665
// For each xid on the closing txn's live list, find the corresponding entry in the reverse live list.
// There must be one.
// If the second xid in the pair is not the xid of the closing transaction, then the second xid must be newer
// than the closing txn, and there is nothing to be done (except to assert the invariant).
// If the second xid in the pair is the xid of the closing transaction, then we need to find the next oldest
// txn.  If the live_xid is in the live list of the next oldest txn, then set the next oldest txn as the 
// second xid in the pair, otherwise delete the entry from the reverse live list.
static int
live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
    TOKUTXN txn = txnv;
    TXNID xid = txn->txnid64;          // xid of txn that is closing
    TXNID live_xid = (TXNID)live_xidv;       // xid on closing txn's live list
    OMTVALUE pairv;
    XID_PAIR pair;
    uint32_t idx;

    int r;
    OMT reverse = txn->logger->live_list_reverse;
    r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, (void *)live_xid, &pairv, &idx);
    invariant(r==0);
    pair = pairv;
    invariant(pair->xid1 == live_xid); //sanity check
    if (pair->xid2 == xid) {
        //There is a record that needs to be either deleted or updated
        TXNID olderxid;
        OMTVALUE olderv;
        uint32_t olderidx;
        OMT snapshot = txn->logger->snapshot_txnids;
        BOOL should_delete = TRUE;
        // find the youngest txn in snapshot that is older than xid
        r = toku_omt_find(snapshot, toku_find_xid_by_xid, (OMTVALUE) xid, -1, &olderv, &olderidx);
        if (r==0) {
            //There is an older txn
            olderxid = (TXNID) olderv;
            invariant(olderxid < xid);
            if (olderxid >= live_xid) {
                //older txn is new enough, we need to update.
                pair->xid2 = olderxid;
                should_delete = FALSE;
            }
        }
        else {
            invariant(r==DB_NOTFOUND);
        }
        if (should_delete) {
            //Delete record
            toku_free(pair);
            r = toku_omt_delete_at(reverse, idx);
            invariant(r==0);
        }
    }
    else {
        invariant(pair->xid2 > xid);
    }
    return r;
}

// When txn ends, update reverse live list.  To do that, examine each txn in this (closing) txn's live list.
static int
live_list_reverse_note_txn_end(TOKUTXN txn) {
    int r;

    r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_end_iter, txn);
    invariant(r==0);
    return r;
}


//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
static int find_xid (OMTVALUE v, void *txnv) {
    TOKUTXN txn = v;
    TOKUTXN txnfind = txnv;
    if (txn->txnid64<txnfind->txnid64) return -1;
    if (txn->txnid64>txnfind->txnid64) return +1;
    return 0;
}

static int remove_txn (OMTVALUE hv, u_int32_t UU(idx), void *txnv)
// Effect:  This function is called on every open BRT that a transaction used.
//  This function removes the transaction from that BRT.
{
666
    FT h = hv;
667 668 669 670 671 672 673 674 675
    TOKUTXN txn = txnv;

    if (txn->txnid64==h->txnid_that_created_or_locked_when_empty) {
        h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
        h->root_that_created_or_locked_when_empty  = TXNID_NONE;
    }
    if (txn->txnid64==h->txnid_that_suppressed_recovery_logs) {
        h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
    }
676
    toku_ft_remove_txn_ref(h, txn);
677 678 679 680 681 682

    return 0;
}

// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
683
    toku_omt_iterate(txn->open_fts, remove_txn, txn);
684 685
}

686
void toku_txn_complete_txn(TOKUTXN txn) {
687 688 689 690 691
    assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b);
    assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
    assert(txn->current_rollback.b == ROLLBACK_NONE.b);
    int r;
    TOKULOGGER logger = txn->logger;
692
    toku_mutex_lock(&logger->txn_list_lock);
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742
    {
        {
            //Remove txn from list (omt) of live transactions
            OMTVALUE txnagain;
            u_int32_t idx;
            r = toku_omt_find_zero(logger->live_txns, find_xid, txn, &txnagain, &idx);
            assert(r==0);
            assert(txn==txnagain);
            r = toku_omt_delete_at(logger->live_txns, idx);
            assert(r==0);
        }

        if (txn->parent==NULL) {
            OMTVALUE v;
            u_int32_t idx;
            //Remove txn from list of live root txns
            r = toku_omt_find_zero(logger->live_root_txns, toku_find_xid_by_xid, (OMTVALUE)txn->txnid64, &v, &idx);
            assert(r==0);
            TXNID xid = (TXNID) v;
            invariant(xid == txn->txnid64);
            r = toku_omt_delete_at(logger->live_root_txns, idx);
            assert(r==0);
        }
        //
        // if this txn created a snapshot, make necessary modifications to list of snapshot txnids and live_list_reverse
        // the order of operations is important. We first remove the txnid from the list of snapshot txnids. This is
        // necessary because root snapshot transactions are in their own live lists. If we do not remove 
        // the txnid from the snapshot txnid list first, then when we go to make the modifications to 
        // live_list_reverse, we have trouble. We end up never removing (id, id) from live_list_reverse
        //
        if (txn->snapshot_type != TXN_SNAPSHOT_NONE && (txn->parent==NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD)) {
            {
                u_int32_t idx;
                OMTVALUE v;
                //Free memory used for snapshot_txnids
                r = toku_omt_find_zero(logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
                invariant(r==0);
                TXNID xid = (TXNID) v;
                invariant(xid == txn->txnid64);
                r = toku_omt_delete_at(logger->snapshot_txnids, idx);
                invariant(r==0);
            }
            live_list_reverse_note_txn_end(txn);
            {
                //Free memory used for live root txns local list
                invariant(toku_omt_size(txn->live_root_txn_list) > 0);
                toku_omt_destroy(&txn->live_root_txn_list);
            }
        }
    }
743
    toku_mutex_unlock(&logger->txn_list_lock);
744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764

    assert(logger->oldest_living_xid <= txn->txnid64);
    if (txn->txnid64 == logger->oldest_living_xid) {
        OMTVALUE oldest_txnv;
        r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv);
        if (r==0) {
            TOKUTXN oldest_txn = oldest_txnv;
            assert(oldest_txn != txn); // We just removed it
            assert(oldest_txn->txnid64 > logger->oldest_living_xid); //Must be newer than the previous oldest
            logger->oldest_living_xid = oldest_txn->txnid64;
            logger->oldest_living_starttime = oldest_txn->starttime;
        }
        else {
            //No living transactions
            assert(r==EINVAL);
            logger->oldest_living_xid = TXNID_NONE_LIVING;
            logger->oldest_living_starttime = 0;
        }
    }

    note_txn_closing(txn);
765
}
766

767
void toku_txn_destroy_txn(TOKUTXN txn) {
768
    if (garbage_collection_debug)
769 770
        verify_snapshot_system(txn->logger);

771 772
    if (txn->open_fts)
        toku_omt_destroy(&txn->open_fts);
773 774 775
    xids_destroy(&txn->xids);
    toku_txn_ignore_free(txn); // 2954
    toku_free(txn);
776

777 778
    STATUS_VALUE(TXN_CLOSE)++;
    STATUS_VALUE(TXN_NUM_OPEN)--;
779
}
780 781 782 783 784 785

XIDS toku_txn_get_xids (TOKUTXN txn) {
    if (txn==0) return xids_get_root_xids();
    else return txn->xids;
}

786
BOOL toku_txnid_older(TXNID a, TXNID b) {
787
    return (BOOL)(a < b); // TODO need modulo 64 arithmetic
788 789
}

790 791 792 793
BOOL toku_txnid_newer(TXNID a, TXNID b) {
    return (BOOL)(a > b); // TODO need modulo 64 arithmetic
}

794 795 796
BOOL toku_txnid_eq(TXNID a, TXNID b) {
    return (BOOL)(a == b);
}
Rich Prohaska's avatar
Rich Prohaska committed
797 798 799 800

void toku_txn_force_fsync_on_commit(TOKUTXN txn) {
    txn->force_fsync_on_commit = TRUE;
}
801 802 803 804 805 806

TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) {
    OMT omt = txn->live_root_txn_list;
    invariant(toku_omt_size(omt)>0);
    OMTVALUE v;
    int r;
807
    r = toku_omt_fetch(omt, 0, &v);
808
    assert_zero(r);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
809 810
    TXNID xid = (TXNID)v;
    return xid;
811 812
}

Zardosht Kasheff's avatar
Zardosht Kasheff committed
813
BOOL toku_is_txn_in_live_root_txn_list(OMT live_root_txn_list, TXNID xid) {
814 815 816
    OMTVALUE txnidpv;
    uint32_t index;
    BOOL retval = FALSE;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
817
    int r = toku_omt_find_zero(live_root_txn_list, toku_find_xid_by_xid, (void *)xid, &txnidpv, &index);
818
    if (r==0) {
Zardosht Kasheff's avatar
Zardosht Kasheff committed
819 820
        TXNID txnid = (TXNID)txnidpv;
        invariant(txnid == xid);
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
        retval = TRUE;
    }
    else {
        invariant(r==DB_NOTFOUND);
    }
    return retval;
}

static void
verify_snapshot_system(TOKULOGGER logger) {
    int     num_snapshot_txnids = toku_omt_size(logger->snapshot_txnids);
    TXNID       snapshot_txnids[num_snapshot_txnids];
    int     num_live_txns = toku_omt_size(logger->live_txns);
    TOKUTXN     live_txns[num_live_txns];
    int     num_live_list_reverse = toku_omt_size(logger->live_list_reverse);
    XID_PAIR    live_list_reverse[num_live_list_reverse];

    int r;
    int i;
    int j;
    //set up arrays for easier access
    for (i = 0; i < num_snapshot_txnids; i++) {
        OMTVALUE v;
844
        r = toku_omt_fetch(logger->snapshot_txnids, i, &v);
845
        assert_zero(r);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
846
        snapshot_txnids[i] = (TXNID) v;
847 848 849
    }
    for (i = 0; i < num_live_txns; i++) {
        OMTVALUE v;
850
        r = toku_omt_fetch(logger->live_txns, i, &v);
851
        assert_zero(r);
852 853 854 855
        live_txns[i] = v;
    }
    for (i = 0; i < num_live_list_reverse; i++) {
        OMTVALUE v;
856
        r = toku_omt_fetch(logger->live_list_reverse, i, &v);
857
        assert_zero(r);
858 859 860 861 862 863 864 865 866 867
        live_list_reverse[i] = v;
    }

    {
        //Verify snapshot_txnids
        for (i = 0; i < num_snapshot_txnids; i++) {
            TXNID snapshot_xid = snapshot_txnids[i];
            invariant(is_txnid_live(logger, snapshot_xid));
            TOKUTXN snapshot_txn;
            r = toku_txnid2txn(logger, snapshot_xid, &snapshot_txn);
868
            assert_zero(r);
869 870 871 872 873
            int   num_live_root_txn_list = toku_omt_size(snapshot_txn->live_root_txn_list);
            TXNID     live_root_txn_list[num_live_root_txn_list];
            {
                for (j = 0; j < num_live_root_txn_list; j++) {
                    OMTVALUE v;
874
                    r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v);
875
                    assert_zero(r);
Zardosht Kasheff's avatar
Zardosht Kasheff committed
876
                    live_root_txn_list[j] = (TXNID)v;
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902
                }
            }
            for (j = 0; j < num_live_root_txn_list; j++) {
                TXNID live_xid = live_root_txn_list[j];
                invariant(live_xid <= snapshot_xid);
                TXNID youngest = toku_get_youngest_live_list_txnid_for(
                    live_xid, 
                    logger->live_list_reverse
                    );
                invariant(youngest!=TXNID_NONE);
                invariant(youngest>=snapshot_xid);
            }
        }
    }
    {
        //Verify live_list_reverse
        for (i = 0; i < num_live_list_reverse; i++) {
            XID_PAIR pair = live_list_reverse[i];
            invariant(pair->xid1 <= pair->xid2);

            {
                //verify pair->xid2 is in snapshot_xids
                u_int32_t index;
                OMTVALUE v2;
                r = toku_omt_find_zero(logger->snapshot_txnids,
                                       toku_find_xid_by_xid,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
903
                                       (OMTVALUE) pair->xid2, &v2, &index);
904
                assert_zero(r);
905 906 907 908 909 910
            }
            for (j = 0; j < num_live_txns; j++) {
                TOKUTXN txn = live_txns[j];
                if (txn->snapshot_type != TXN_SNAPSHOT_NONE) {
                    BOOL expect = txn->snapshot_txnid64 >= pair->xid1 &&
                                  txn->snapshot_txnid64 <= pair->xid2;
Zardosht Kasheff's avatar
Zardosht Kasheff committed
911
                    BOOL found = toku_is_txn_in_live_root_txn_list(txn->live_root_txn_list, pair->xid1);
912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928
                    invariant((expect==FALSE) == (found==FALSE));
                }
            }
        }
    }
    {
        //Verify live_txns
        for (i = 0; i < num_live_txns; i++) {
            TOKUTXN txn = live_txns[i];

            BOOL expect = txn->snapshot_txnid64 == txn->txnid64;
            {
                //verify pair->xid2 is in snapshot_xids
                u_int32_t index;
                OMTVALUE v2;
                r = toku_omt_find_zero(logger->snapshot_txnids,
                                       toku_find_xid_by_xid,
Zardosht Kasheff's avatar
Zardosht Kasheff committed
929
                                       (OMTVALUE) txn->txnid64, &v2, &index);
930 931 932 933 934 935 936 937
                invariant(r==0 || r==DB_NOTFOUND);
                invariant((r==0) == (expect!=0));
            }

        }
    }
}

938 939 940 941 942 943 944
// routines for checking if rollback errors should be ignored because a hot index create was aborted
// 2954
// returns 
//      0 on success
//      ENOMEM if can't alloc memory
//      EINVAL if txn = NULL
//      -1 on other errors
945 946
void toku_txn_ignore_init(TOKUTXN txn) {
    assert(txn);
947
    TXN_IGNORE txni = &(txn->ignore_errors);
948 949 950 951 952
    txni->fns_allocated = 0;
    txni->filenums.num = 0;
    txni->filenums.filenums = NULL;
}

953 954
void toku_txn_ignore_free(TOKUTXN txn) {
    assert(txn);
955
    TXN_IGNORE txni = &(txn->ignore_errors);
956
    toku_free(txni->filenums.filenums);
957 958
    txni->filenums.num = 0;
    txni->filenums.filenums = NULL;
959 960 961 962 963 964 965
}

// returns 
//      0 on success
//      ENOMEM if can't alloc memory
//      EINVAL if txn = NULL
//      -1 on other errors
966 967
int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum) {
    assert(txn);
968 969 970 971
    // check for dups
    if ( toku_txn_ignore_contains(txn, filenum) == 0 ) return 0;
    // alloc more space if needed
    const int N = 2;
972
    TXN_IGNORE txni = &(txn->ignore_errors);
973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994
    if ( txni->filenums.num == txni->fns_allocated ) {
        if ( txni->fns_allocated == 0 ) {
            CALLOC_N(N, txni->filenums.filenums);
            if ( txni->filenums.filenums == NULL ) return ENOMEM;
            txni->fns_allocated = N;
        }
        else {
            XREALLOC_N(txni->fns_allocated * N, txni->filenums.filenums);
            txni->fns_allocated = txni->fns_allocated * N;
        }
    }
    txni->filenums.num++;
    txni->filenums.filenums[txni->filenums.num - 1].fileid = filenum.fileid; 

    return 0;
}

// returns 
//      0 on success
//      ENOENT if not found
//      EINVAL if txn = NULL
//      -1 on other errors
995
// THIS FUNCTION IS NOT USED IN FUNCTIONAL CODE, BUT IS USEFUL FOR TESTING
996 997
int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum) {
    assert(txn);
998
    TXN_IGNORE txni = &(txn->ignore_errors);
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
    int found_fn = 0;
    if ( txni->filenums.num == 0 ) return ENOENT;
    for(uint32_t i=0; i<txni->filenums.num; i++) {
        if ( !found_fn ) {
            if ( txni->filenums.filenums[i].fileid == filenum.fileid ) {
                found_fn = 1;
            }
        }
        else { // remove bubble in array
            txni->filenums.filenums[i-1].fileid = txni->filenums.filenums[i].fileid;
        }
    }
    if ( !found_fn ) return ENOENT;
    txni->filenums.num--;
    return 0;
}

// returns 
//      0 on success
//      ENOENT if not found
//      EINVAL if txn = NULL
//      -1 on other errors
1021 1022
int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum) {
    assert(txn);
1023
    TXN_IGNORE txni = &(txn->ignore_errors);
1024 1025 1026 1027 1028 1029 1030
    for(uint32_t i=0; i<txni->filenums.num; i++) {
        if ( txni->filenums.filenums[i].fileid == filenum.fileid ) {
            return 0;
        }
    }
    return ENOENT;
}
1031 1032 1033 1034 1035

TOKUTXN_STATE
toku_txn_get_state(TOKUTXN txn) {
    return txn->state;
}
1036

1037 1038 1039 1040 1041 1042 1043
#include <valgrind/helgrind.h>
void __attribute__((__constructor__)) toku_txn_status_helgrind_ignore(void);
void
toku_txn_status_helgrind_ignore(void) {
    VALGRIND_HG_DISABLE_CHECKING(&txn_status, sizeof txn_status);
}

1044
#undef STATUS_VALUE