Commit e4ef93b4 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:4686] [t:5123] [t:5062] merge #4686 to main, includes fix for #5123. closes #4686.

git-svn-id: file:///svn/toku/tokudb@44814 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7146ed7f
...@@ -465,6 +465,7 @@ static void print_db_txn_struct (void) { ...@@ -465,6 +465,7 @@ static void print_db_txn_struct (void) {
"int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*)", "int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)", "int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *)", "int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *)",
"u_int64_t (*id64) (DB_TXN*)",
NULL}; NULL};
sort_and_dump_fields("db_txn", false, extra); sort_and_dump_fields("db_txn", false, extra);
} }
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
#include "includes.h"
#include "toku_os.h"
#include "checkpoint.h"
#define TESTDIR __SRCFILE__ ".dir"
#define FILENAME "test0.ft"
#include "test-ft-txns.h"
static void test_5123(void) {
TOKULOGGER logger;
CACHETABLE ct;
test_setup(&logger, &ct);
int r;
TOKUTXN txn[3];
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn[0], logger, TXN_SNAPSHOT_ROOT);
CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn[1], logger, TXN_SNAPSHOT_ROOT);
CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn[2], logger, TXN_SNAPSHOT_ROOT);
CKERR(r);
toku_maybe_log_begin_txn_for_write_operation(txn[0]);
toku_maybe_log_begin_txn_for_write_operation(txn[2]);
toku_maybe_log_begin_txn_for_write_operation(txn[1]);
r = toku_txn_commit_txn(txn[1], FALSE, NULL, NULL);
CKERR(r);
r = toku_logger_close_rollback(logger, false);
CKERR(r);
r = toku_cachetable_close(&ct);
CKERR(r);
// "Crash"
r = toku_logger_close(&logger);
CKERR(r);
ct = NULL;
logger = NULL;
// "Recover"
test_setup_and_recover(&logger, &ct);
shutdown_after_recovery(&logger, &ct);
}
int test_main (int argc, const char *argv[]) {
default_parse_args(argc, argv);
test_5123();
return 0;
}
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ifndef TEST_FT_TXNS_H
#define TEST_FT_TXNS_H
#ident "$Id: xid_lsn_independent.c 44752 2012-06-20 21:59:39Z yfogel $"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if !defined(TESTDIR) || !defined(FILENAME)
# error "must define TESTDIR and FILENAME"
#endif
static inline void
test_setup(TOKULOGGER *loggerp, CACHETABLE *ctp) {
*loggerp = NULL;
*ctp = NULL;
int r;
r = system("rm -rf " TESTDIR);
CKERR(r);
r = toku_os_mkdir(TESTDIR, S_IRWXU);
CKERR(r);
r = toku_logger_create(loggerp);
CKERR(r);
TOKULOGGER logger = *loggerp;
r = toku_logger_open(TESTDIR, logger);
CKERR(r);
r = toku_create_cachetable(ctp, 0, ZERO_LSN, logger);
CKERR(r);
CACHETABLE ct = *ctp;
toku_cachetable_set_env_dir(ct, TESTDIR);
toku_logger_set_cachetable(logger, ct);
r = toku_logger_open_rollback(logger, ct, TRUE);
CKERR(r);
r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT);
CKERR(r);
}
static inline void
xid_lsn_keep_cachetable_callback (DB_ENV *env, CACHETABLE cachetable) {
CACHETABLE *ctp = (void*)env;
*ctp = cachetable;
}
static inline void test_setup_and_recover(TOKULOGGER *loggerp, CACHETABLE *ctp) {
int r;
TOKULOGGER logger = NULL;
CACHETABLE ct = NULL;
r = toku_logger_create(&logger);
CKERR(r);
void *ctv = &ct; // Use intermediate void* to avoid compiler warning.
r = tokudb_recover(ctv,
NULL_prepared_txn_callback,
xid_lsn_keep_cachetable_callback,
logger,
TESTDIR, TESTDIR, 0, 0, 0, NULL, 0);
CKERR(r);
if (!toku_logger_is_open(logger)) {
//Did not need recovery.
invariant(ct==NULL);
r = toku_logger_open(TESTDIR, logger);
CKERR(r);
r = toku_create_cachetable(&ct, 0, ZERO_LSN, logger);
CKERR(r);
toku_logger_set_cachetable(logger, ct);
}
*ctp = ct;
*loggerp = logger;
}
static inline void clean_shutdown(TOKULOGGER *loggerp, CACHETABLE *ctp) {
int r;
r = toku_checkpoint(*ctp, *loggerp, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
CKERR(r);
r = toku_logger_close_rollback(*loggerp, false);
CKERR(r);
r = toku_checkpoint(*ctp, *loggerp, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
CKERR(r);
r = toku_logger_shutdown(*loggerp);
CKERR(r);
r = toku_cachetable_close(ctp);
CKERR(r);
r = toku_logger_close(loggerp);
CKERR(r);
}
static inline void shutdown_after_recovery(TOKULOGGER *loggerp, CACHETABLE *ctp) {
int r;
r = toku_logger_close_rollback(*loggerp, false);
CKERR(r);
r = toku_cachetable_close(ctp);
CKERR(r);
r = toku_logger_close(loggerp);
CKERR(r);
}
#endif /* TEST_FT_TXNS_H */
...@@ -13,99 +13,7 @@ ...@@ -13,99 +13,7 @@
#define TESTDIR __SRCFILE__ ".dir" #define TESTDIR __SRCFILE__ ".dir"
#define FILENAME "test0.ft" #define FILENAME "test0.ft"
static void test_setup(TOKULOGGER *loggerp, CACHETABLE *ctp) { #include "test-ft-txns.h"
*loggerp = NULL;
*ctp = NULL;
int r;
r = system("rm -rf " TESTDIR);
CKERR(r);
r = toku_os_mkdir(TESTDIR, S_IRWXU);
CKERR(r);
r = toku_logger_create(loggerp);
CKERR(r);
TOKULOGGER logger = *loggerp;
r = toku_logger_open(TESTDIR, logger);
CKERR(r);
r = toku_create_cachetable(ctp, 0, ZERO_LSN, logger);
CKERR(r);
CACHETABLE ct = *ctp;
toku_cachetable_set_env_dir(ct, TESTDIR);
toku_logger_set_cachetable(logger, ct);
r = toku_logger_open_rollback(logger, ct, TRUE);
CKERR(r);
r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT);
CKERR(r);
}
static void
xid_lsn_keep_cachetable_callback (DB_ENV *env, CACHETABLE cachetable) {
CACHETABLE *ctp = (void*)env;
*ctp = cachetable;
}
static void test_setup_and_recover(TOKULOGGER *loggerp, CACHETABLE *ctp) {
int r;
TOKULOGGER logger = NULL;
CACHETABLE ct = NULL;
r = toku_logger_create(&logger);
CKERR(r);
void *ctv = &ct; // Use intermediate void* to avoid compiler warning.
r = tokudb_recover(ctv,
NULL_prepared_txn_callback,
xid_lsn_keep_cachetable_callback,
logger,
TESTDIR, TESTDIR, 0, 0, 0, NULL, 0);
CKERR(r);
if (!toku_logger_is_open(logger)) {
//Did not need recovery.
invariant(ct==NULL);
r = toku_logger_open(TESTDIR, logger);
CKERR(r);
r = toku_create_cachetable(&ct, 0, ZERO_LSN, logger);
CKERR(r);
toku_logger_set_cachetable(logger, ct);
}
*ctp = ct;
*loggerp = logger;
}
static void clean_shutdown(TOKULOGGER *loggerp, CACHETABLE *ctp) {
int r;
r = toku_checkpoint(*ctp, *loggerp, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
CKERR(r);
r = toku_logger_close_rollback(*loggerp, false);
CKERR(r);
r = toku_checkpoint(*ctp, *loggerp, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
CKERR(r);
r = toku_logger_shutdown(*loggerp);
CKERR(r);
r = toku_cachetable_close(ctp);
CKERR(r);
r = toku_logger_close(loggerp);
CKERR(r);
}
static void shutdown_after_recovery(TOKULOGGER *loggerp, CACHETABLE *ctp) {
int r;
r = toku_logger_close_rollback(*loggerp, false);
CKERR(r);
r = toku_cachetable_close(ctp);
CKERR(r);
r = toku_logger_close(loggerp);
CKERR(r);
}
static void do_txn(TOKULOGGER logger, bool readonly) { static void do_txn(TOKULOGGER logger, bool readonly) {
int r; int r;
...@@ -310,7 +218,7 @@ static void test_xid_lsn_independent_parents(int N) { ...@@ -310,7 +218,7 @@ static void test_xid_lsn_independent_parents(int N) {
int test_main (int argc, const char *argv[]) { int test_main (int argc, const char *argv[]) {
default_parse_args(argc, argv); default_parse_args(argc, argv);
for (int i=1; i<=128; i *= 2) { for (int i=1; i<=128; i *= 2) {
test_xid_lsn_independent(i); test_xid_lsn_independent(i);
test_xid_lsn_independent_crash_recovery(i); test_xid_lsn_independent_crash_recovery(i);
test_xid_lsn_independent_shutdown_recovery(i); test_xid_lsn_independent_shutdown_recovery(i);
test_xid_lsn_independent_parents(i); test_xid_lsn_independent_parents(i);
......
...@@ -63,6 +63,12 @@ toku_txn_unlock(TOKUTXN txn) ...@@ -63,6 +63,12 @@ toku_txn_unlock(TOKUTXN txn)
toku_mutex_unlock(&txn->txn_lock); toku_mutex_unlock(&txn->txn_lock);
} }
u_int64_t
toku_txn_get_id(TOKUTXN txn)
{
return txn->txnid64;
}
int int
toku_txn_begin_txn ( toku_txn_begin_txn (
DB_TXN *container_db_txn, DB_TXN *container_db_txn,
......
...@@ -14,6 +14,8 @@ extern "C" { ...@@ -14,6 +14,8 @@ extern "C" {
void toku_txn_lock(TOKUTXN txn); void toku_txn_lock(TOKUTXN txn);
void toku_txn_unlock(TOKUTXN txn); void toku_txn_unlock(TOKUTXN txn);
u_int64_t toku_txn_get_id(TOKUTXN txn);
int toku_txn_begin_txn ( int toku_txn_begin_txn (
DB_TXN *container_db_txn, DB_TXN *container_db_txn,
TOKUTXN parent_tokutxn, TOKUTXN parent_tokutxn,
......
...@@ -314,16 +314,33 @@ setup_live_root_txn_list(TXN_MANAGER txn_manager, TOKUTXN txn) { ...@@ -314,16 +314,33 @@ setup_live_root_txn_list(TXN_MANAGER txn_manager, TOKUTXN txn) {
return r; return r;
} }
// Add this txn to the global list of txns that have their own snapshots. //Heaviside function to search through an OMT by a TXNID
// (Note, if a txn is a child that creates its own snapshot, then that child xid
// is the xid stored in the global list.)
static int static int
snapshot_txnids_note_txn(TXN_MANAGER txn_manager, TOKUTXN txn) { find_by_xid (OMTVALUE v, void *txnidv) {
TOKUTXN txn = v;
TXNID txnidfind = (TXNID)txnidv;
if (txn->txnid64<txnidfind) return -1;
if (txn->txnid64>txnidfind) return +1;
return 0;
}
static void
omt_insert_at_end_unless_recovery(OMT omt, int (*h)(OMTVALUE, void*extra), TOKUTXN txn, OMTVALUE v, bool for_recovery)
// Effect: insert v into omt that is sorted by xid gotten from txn.
// Rationale:
// During recovery, we get txns in the order that they did their first
// write operation, which is not necessarily monotonically increasing.
// During normal operation, txns are created with strictly increasing
// txnids, so we can always insert at the end.
{
int r; int r;
OMT txnids = txn_manager->snapshot_txnids; uint32_t idx = toku_omt_size(omt);
r = toku_omt_insert_at(txnids, (OMTVALUE) txn->txnid64, toku_omt_size(txnids)); if (for_recovery) {
assert_zero(r); r = toku_omt_find_zero(omt, h, (void *) txn->txnid64, NULL, &idx);
return r; invariant(r==DB_NOTFOUND);
}
r = toku_omt_insert_at(omt, v, idx);
lazy_assert_zero(r);
} }
static TXNID static TXNID
...@@ -382,19 +399,25 @@ int toku_txn_manager_start_txn( ...@@ -382,19 +399,25 @@ int toku_txn_manager_start_txn(
toku_txn_update_xids_in_txn(txn, xid, xids); toku_txn_update_xids_in_txn(txn, xid, xids);
if (toku_omt_size(txn_manager->live_txns) == 0) { if (!for_recovery) {
assert(txn_manager->oldest_living_xid == TXNID_NONE_LIVING); // TODO(leif): this would be WRONG during recovery. We cannot
txn_manager->oldest_living_xid = txn->txnid64; // assume that the first xbegin we see is the oldest, because it's
txn_manager->oldest_living_starttime = txn->starttime; // just the txn that *did a write* first. Right now, this cached
// value is only used for engine status, so we're not too worried.
// Maybe we just kill this (txn_manager->oldest_living_*). Also
// see comment on txn_manager_get_oldest_living_xid_unlocked.
if (toku_omt_size(txn_manager->live_txns) == 0) {
assert(txn_manager->oldest_living_xid == TXNID_NONE_LIVING);
txn_manager->oldest_living_xid = txn->txnid64;
txn_manager->oldest_living_starttime = txn->starttime;
}
assert(txn_manager->oldest_living_xid <= txn->txnid64);
} }
assert(txn_manager->oldest_living_xid <= txn->txnid64);
{ //Add txn to list (omt) of live transactions
//Add txn to list (omt) of live transactions omt_insert_at_end_unless_recovery(txn_manager->live_txns, find_by_xid, txn, (OMTVALUE) txn, for_recovery);
//We know it is the newest one.
r = toku_omt_insert_at(txn_manager->live_txns, txn, toku_omt_size(txn_manager->live_txns));
assert_zero(r);
{
// //
// maintain the data structures necessary for MVCC: // maintain the data structures necessary for MVCC:
// 1. add txn to list of live_root_txns if this is a root transaction // 1. add txn to list of live_root_txns if this is a root transaction
...@@ -411,12 +434,7 @@ int toku_txn_manager_start_txn( ...@@ -411,12 +434,7 @@ int toku_txn_manager_start_txn(
// add ancestor information, and maintain global live root txn list // add ancestor information, and maintain global live root txn list
if (parent == NULL) { if (parent == NULL) {
//Add txn to list (omt) of live root txns //Add txn to list (omt) of live root txns
r = toku_omt_insert_at( omt_insert_at_end_unless_recovery(txn_manager->live_root_txns, toku_find_xid_by_xid, txn, (OMTVALUE) txn->txnid64, for_recovery);
txn_manager->live_root_txns,
(OMTVALUE) txn->txnid64,
toku_omt_size(txn_manager->live_root_txns)
); //We know it is the newest one.
assert_zero(r);
} }
// setup information for snapshot reads // setup information for snapshot reads
...@@ -426,8 +444,11 @@ int toku_txn_manager_start_txn( ...@@ -426,8 +444,11 @@ int toku_txn_manager_start_txn(
if (parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) { if (parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) {
r = setup_live_root_txn_list(txn_manager, txn); r = setup_live_root_txn_list(txn_manager, txn);
assert_zero(r); assert_zero(r);
r = snapshot_txnids_note_txn(txn_manager, txn);
assert_zero(r); // Add this txn to the global list of txns that have their own snapshots.
// (Note, if a txn is a child that creates its own snapshot, then that child xid
// is the xid stored in the global list.)
omt_insert_at_end_unless_recovery(txn_manager->snapshot_txnids, toku_find_xid_by_xid, txn, (OMTVALUE) txn->txnid64, for_recovery);
} }
// in this case, it is a child transaction that specified its snapshot to be that // in this case, it is a child transaction that specified its snapshot to be that
// of the root transaction // of the root transaction
...@@ -615,6 +636,7 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) { ...@@ -615,6 +636,7 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
assert(txn_manager->oldest_living_xid <= txn->txnid64); assert(txn_manager->oldest_living_xid <= txn->txnid64);
if (txn->txnid64 == txn_manager->oldest_living_xid) { if (txn->txnid64 == txn_manager->oldest_living_xid) {
// oldest_living_xid is always zero during recovery
OMTVALUE oldest_txnv; OMTVALUE oldest_txnv;
r = toku_omt_fetch(txn_manager->live_txns, 0, &oldest_txnv); r = toku_omt_fetch(txn_manager->live_txns, 0, &oldest_txnv);
if (r==0) { if (r==0) {
...@@ -659,16 +681,6 @@ void toku_txn_manager_clone_state_for_gc( ...@@ -659,16 +681,6 @@ void toku_txn_manager_clone_state_for_gc(
toku_mutex_unlock(&txn_manager->txn_manager_lock); toku_mutex_unlock(&txn_manager->txn_manager_lock);
} }
//Heaviside function to search through an OMT by a TXNID
static int
find_by_xid (OMTVALUE v, void *txnidv) {
TOKUTXN txn = v;
TXNID txnidfind = (TXNID)txnidv;
if (txn->txnid64<txnidfind) return -1;
if (txn->txnid64>txnidfind) return +1;
return 0;
}
void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID txnid, TOKUTXN *result) { void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID txnid, TOKUTXN *result) {
OMTVALUE txnfound; OMTVALUE txnfound;
int r = toku_omt_find_zero(txn_manager->live_txns, find_by_xid, (OMTVALUE)txnid, &txnfound, NULL); int r = toku_omt_find_zero(txn_manager->live_txns, find_by_xid, (OMTVALUE)txnid, &txnfound, NULL);
......
...@@ -89,11 +89,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -89,11 +89,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe; myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
myargs[i].do_prepare = true;
} }
// make the guy that does point queries // make the guy that does point queries
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
myargs[i].do_prepare = true;
} }
int num_seconds = random() % cli_args->time_of_test; int num_seconds = random() % cli_args->time_of_test;
......
...@@ -89,11 +89,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -89,11 +89,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) { for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe; myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op; myargs[i].operation = update_op;
myargs[i].do_prepare = true;
} }
// make the guy that does point queries // make the guy that does point queries
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) { for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op; myargs[i].operation = ptquery_op;
myargs[i].do_prepare = true;
} }
run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args); run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
......
...@@ -167,6 +167,7 @@ struct arg { ...@@ -167,6 +167,7 @@ struct arg {
int thread_idx; int thread_idx;
int num_threads; int num_threads;
struct cli_args *cli; struct cli_args *cli;
bool do_prepare;
}; };
DB_TXN * const null_txn = 0; DB_TXN * const null_txn = 0;
...@@ -180,6 +181,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl ...@@ -180,6 +181,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl
arg->lock_type = STRESS_LOCK_NONE; arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT; arg->txn_type = DB_TXN_SNAPSHOT;
arg->operation_extra = NULL; arg->operation_extra = NULL;
arg->do_prepare = false;
} }
enum operation_type { enum operation_type {
...@@ -401,6 +403,15 @@ static void *worker(void *arg_v) { ...@@ -401,6 +403,15 @@ static void *worker(void *arg_v) {
r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
} }
r = arg->operation(txn, arg, arg->operation_extra, we->counters); r = arg->operation(txn, arg, arg->operation_extra, we->counters);
if (!arg->cli->single_txn && arg->do_prepare) {
u_int8_t gid[DB_GID_SIZE];
memset(gid, 0, DB_GID_SIZE);
u_int64_t gid_val = txn->id64(txn);
u_int64_t *gid_count_p = (void *)gid; // make gcc --happy about -Wstrict-aliasing
*gid_count_p = gid_val;
int rr = txn->prepare(txn, gid);
assert_zero(rr);
}
if (r == 0) { if (r == 0) {
if (!arg->cli->single_txn) { if (!arg->cli->single_txn) {
{ int chk_r = txn->commit(txn,0); CKERR(chk_r); } { int chk_r = txn->commit(txn,0); CKERR(chk_r); }
...@@ -1302,6 +1313,28 @@ static int fill_tables_with_zeroes(DB **dbs, int num_DBs, int num_elements, u_in ...@@ -1302,6 +1313,28 @@ static int fill_tables_with_zeroes(DB **dbs, int num_DBs, int num_elements, u_in
return 0; return 0;
} }
static void do_xa_recovery(DB_ENV* env) {
DB_PREPLIST preplist[1];
long num_recovered= 0;
int r = 0;
r = env->txn_recover(env, preplist, 1, &num_recovered, DB_NEXT);
while(r==0 && num_recovered > 0) {
DB_TXN* recovered_txn = preplist[0].txn;
if (verbose) {
printf("recovering transaction with id %"PRIu64" \n", recovered_txn->id64(recovered_txn));
}
if (random() % 2 == 0) {
int rr = recovered_txn->commit(recovered_txn, 0);
CKERR(rr);
}
else {
int rr = recovered_txn->abort(recovered_txn);
CKERR(rr);
}
r = env->txn_recover(env, preplist, 1, &num_recovered, DB_NEXT);
}
}
static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
int (*bt_compare)(DB *, const DBT *, const DBT *), int (*bt_compare)(DB *, const DBT *, const DBT *),
struct env_args env_args) { struct env_args env_args) {
...@@ -1328,6 +1361,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1328,6 +1361,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
CKERR(r); CKERR(r);
} }
r = env->open(env, env_args.envdir, DB_RECOVER|DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, env_args.envdir, DB_RECOVER|DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
do_xa_recovery(env);
r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r); r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r); r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r); r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
......
...@@ -141,6 +141,12 @@ toku_txn_id(DB_TXN * txn) { ...@@ -141,6 +141,12 @@ toku_txn_id(DB_TXN * txn) {
return -1; return -1;
} }
static u_int64_t
toku_txn_id64(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp);
return toku_txn_get_id(db_txn_struct_i(txn)->tokutxn);
}
static int static int
toku_txn_abort_only(DB_TXN * txn, toku_txn_abort_only(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
...@@ -402,6 +408,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { ...@@ -402,6 +408,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
STXN(xa_prepare); STXN(xa_prepare);
STXN(txn_stat); STXN(txn_stat);
#undef STXN #undef STXN
result->id64 = toku_txn_id64;
result->parent = stxn; result->parent = stxn;
#if !TOKUDB_NATIVE_H #if !TOKUDB_NATIVE_H
...@@ -479,6 +486,7 @@ void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn) { ...@@ -479,6 +486,7 @@ void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn) {
STXN(prepare); STXN(prepare);
STXN(txn_stat); STXN(txn_stat);
#undef STXN #undef STXN
result->id64 = toku_txn_id64;
result->parent = NULL; result->parent = NULL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment