Commit 541adde4 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4765], [t:4876], [t:4890], merge to main

git-svn-id: file:///svn/toku/tokudb@44012 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9977173d
...@@ -60,6 +60,7 @@ set(FT_SOURCES ...@@ -60,6 +60,7 @@ set(FT_SOURCES
sub_block.c sub_block.c
threadpool.c threadpool.c
txn.c txn.c
txn_manager.c
ule.c ule.c
workqueue.c workqueue.c
x1764.c x1764.c
......
...@@ -3348,8 +3348,12 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -3348,8 +3348,12 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
} }
// Log all the open transactions MUST BE AFTER OPEN FILES // Log all the open transactions MUST BE AFTER OPEN FILES
{ {
ct->checkpoint_num_txns = toku_omt_size(logger->live_txns); ct->checkpoint_num_txns = toku_txn_manager_num_live_txns(logger->txn_manager);
int r = toku_omt_iterate(logger->live_txns, log_open_txn, NULL); int r = toku_txn_manager_iter_over_live_txns(
logger->txn_manager,
log_open_txn,
NULL
);
assert(r==0); assert(r==0);
} }
// Log rollback suppression for all the open files MUST BE AFTER TXNS // Log rollback suppression for all the open files MUST BE AFTER TXNS
......
...@@ -2249,29 +2249,19 @@ toku_bnc_flush_to_child( ...@@ -2249,29 +2249,19 @@ toku_bnc_flush_to_child(
// Run garbage collection, if we are a leaf entry. // Run garbage collection, if we are a leaf entry.
TOKULOGGER logger = toku_cachefile_logger(h->cf); TOKULOGGER logger = toku_cachefile_logger(h->cf);
if (child->height == 0 && logger) { if (child->height == 0 && logger) {
int r;
OMT snapshot_txnids = NULL; OMT snapshot_txnids = NULL;
OMT live_list_reverse = NULL; OMT live_list_reverse = NULL;
OMT live_root_txns = NULL; OMT live_root_txns = NULL;
{ toku_txn_manager_clone_state_for_gc(
toku_mutex_lock(&logger->txn_list_lock); logger->txn_manager,
r = toku_omt_clone_noptr(&snapshot_txnids, &snapshot_txnids,
logger->snapshot_txnids); &live_list_reverse,
assert_zero(r); &live_root_txns
r = toku_omt_clone_pool(&live_list_reverse, );
logger->live_list_reverse, size_t buffsize = toku_fifo_buffer_size_in_use(bnc->buffer);
sizeof(XID_PAIR_S)); STATUS_VALUE(FT_MSG_BYTES_OUT) += buffsize;
assert_zero(r); // may be misleading if there's a broadcast message in there
r = toku_omt_clone_noptr(&live_root_txns, STATUS_VALUE(FT_MSG_BYTES_CURR) -= buffsize;
logger->live_root_txns);
assert_zero(r);
// take advantage of surrounding mutex, update stats.
size_t buffsize = toku_fifo_buffer_size_in_use(bnc->buffer);
STATUS_VALUE(FT_MSG_BYTES_OUT) += buffsize;
// may be misleading if there's a broadcast message in there
STATUS_VALUE(FT_MSG_BYTES_CURR) -= buffsize;
toku_mutex_unlock(&logger->txn_list_lock);
}
// Perform the garbage collection. // Perform the garbage collection.
ft_leaf_gc_all_les(child, h, snapshot_txnids, live_list_reverse, live_root_txns); ft_leaf_gc_all_les(child, h, snapshot_txnids, live_list_reverse, live_root_txns);
...@@ -2609,25 +2599,27 @@ toku_ft_optimize (FT_HANDLE brt) { ...@@ -2609,25 +2599,27 @@ toku_ft_optimize (FT_HANDLE brt) {
int r = 0; int r = 0;
TOKULOGGER logger = toku_cachefile_logger(brt->ft->cf); TOKULOGGER logger = toku_cachefile_logger(brt->ft->cf);
TXNID oldest = toku_logger_get_oldest_living_xid(logger, NULL); if (logger) {
TXNID oldest = toku_txn_manager_get_oldest_living_xid(logger->txn_manager, NULL);
XIDS root_xids = xids_get_root_xids(); XIDS root_xids = xids_get_root_xids();
XIDS message_xids; XIDS message_xids;
if (oldest == TXNID_NONE_LIVING) { if (oldest == TXNID_NONE_LIVING) {
message_xids = root_xids; message_xids = root_xids;
} }
else { else {
r = xids_create_child(root_xids, &message_xids, oldest); r = xids_create_child(root_xids, &message_xids, oldest);
invariant(r==0); invariant(r==0);
} }
DBT key; DBT key;
DBT val; DBT val;
toku_init_dbt(&key); toku_init_dbt(&key);
toku_init_dbt(&val); toku_init_dbt(&val);
FT_MSG_S ftcmd = { FT_OPTIMIZE, ZERO_MSN, message_xids, .u.id={&key,&val}}; FT_MSG_S ftcmd = { FT_OPTIMIZE, ZERO_MSN, message_xids, .u.id={&key,&val}};
r = toku_ft_root_put_cmd(brt->ft, &ftcmd); r = toku_ft_root_put_cmd(brt->ft, &ftcmd);
xids_destroy(&message_xids); xids_destroy(&message_xids);
}
return r; return r;
} }
......
...@@ -134,6 +134,7 @@ typedef struct { ...@@ -134,6 +134,7 @@ typedef struct {
} FILENUMS; } FILENUMS;
typedef struct tokulogger *TOKULOGGER; typedef struct tokulogger *TOKULOGGER;
typedef struct txn_manager *TXN_MANAGER;
#define NULL_LOGGER ((TOKULOGGER)0) #define NULL_LOGGER ((TOKULOGGER)0)
typedef struct tokutxn *TOKUTXN; typedef struct tokutxn *TOKUTXN;
typedef struct txninfo *TXNINFO; typedef struct txninfo *TXNINFO;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <string.h> #include <string.h>
#include <dirent.h> #include <dirent.h>
#include "txn_manager.h"
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" { extern "C" {
...@@ -75,17 +76,7 @@ struct tokulogger { ...@@ -75,17 +76,7 @@ struct tokulogger {
int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB
// To access these, you must have the input lock // To access these, you must have the input lock
toku_mutex_t txn_list_lock; // a lock protecting live_list_reverse and snapshot_txnids for now TODO: revisit this decision
LSN lsn; // the next available lsn LSN lsn; // the next available lsn
OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that?
OMT live_root_txns; // a sorted tree.
OMT snapshot_txnids; //contains TXNID x | x is snapshot txn
//contains TXNID pairs (x,y) | y is oldest txnid s.t. x is in y's live list
// every TXNID that is in some snapshot's live list is used as the key for this OMT, x, as described above.
// The second half of the pair, y, is the youngest snapshot txnid (that is, has the highest LSN), such that x is in its live list.
// So, for example, Say T_800 begins, T_800 commits right after snapshot txn T_1100 begins. Then (800,1100) is in
// this list
OMT live_list_reverse;
struct logbuf inbuf; // data being accumulated for the write struct logbuf inbuf; // data being accumulated for the write
// To access these, you must have the output condition lock. // To access these, you must have the output condition lock.
...@@ -100,8 +91,6 @@ struct tokulogger { ...@@ -100,8 +91,6 @@ struct tokulogger {
TOKULOGFILEMGR logfilemgr; TOKULOGFILEMGR logfilemgr;
u_int32_t write_block_size; // How big should the blocks be written to various logs? u_int32_t write_block_size; // How big should the blocks be written to various logs?
TXNID oldest_living_xid;
time_t oldest_living_starttime; // timestamp in seconds of when txn with oldest_living_xid started
u_int64_t input_lock_ctr; // how many times has input_lock been taken and released u_int64_t input_lock_ctr; // how many times has input_lock been taken and released
u_int64_t output_condition_lock_ctr; // how many times has output_condition_lock been taken and released u_int64_t output_condition_lock_ctr; // how many times has output_condition_lock been taken and released
...@@ -109,8 +98,7 @@ struct tokulogger { ...@@ -109,8 +98,7 @@ struct tokulogger {
void (*remove_finalize_callback) (DICTIONARY_ID, void*); // ydb-level callback to be called when a transaction that ... void (*remove_finalize_callback) (DICTIONARY_ID, void*); // ydb-level callback to be called when a transaction that ...
void * remove_finalize_callback_extra; // ... deletes a file is committed or when one that creates a file is aborted. void * remove_finalize_callback_extra; // ... deletes a file is committed or when one that creates a file is aborted.
CACHEFILE rollback_cachefile; CACHEFILE rollback_cachefile;
struct toku_list prepared_txns; // transactions that have been prepared and are unresolved, but have not been returned through txn_recover. TXN_MANAGER txn_manager;
struct toku_list prepared_and_returned_txns; // transactions that have been prepared and unresolved, and have been returned through txn_recover. We need this list so that we can restart the recovery.
}; };
int toku_logger_find_next_unused_log_file(const char *directory, long long *result); int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
...@@ -165,7 +153,6 @@ struct tokutxn { ...@@ -165,7 +153,6 @@ struct tokutxn {
BOOL recovered_from_checkpoint; BOOL recovered_from_checkpoint;
BOOL checkpoint_needed_before_commit; BOOL checkpoint_needed_before_commit;
TXN_IGNORE_S ignore_errors; // 2954
TOKUTXN_STATE state; TOKUTXN_STATE state;
LSN do_fsync_lsn; LSN do_fsync_lsn;
BOOL do_fsync; BOOL do_fsync;
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h" #include "includes.h"
#include "txn_manager.h"
static const int log_format_version=TOKU_LOG_VERSION; static const int log_format_version=TOKU_LOG_VERSION;
...@@ -68,7 +69,6 @@ static BOOL is_a_logfile (const char *name, long long *number_result) { ...@@ -68,7 +69,6 @@ static BOOL is_a_logfile (const char *name, long long *number_result) {
int toku_logger_create (TOKULOGGER *resultp) { int toku_logger_create (TOKULOGGER *resultp) {
int r;
TOKULOGGER MALLOC(result); TOKULOGGER MALLOC(result);
if (result==0) return errno; if (result==0) return errno;
result->is_open=FALSE; result->is_open=FALSE;
...@@ -82,11 +82,6 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -82,11 +82,6 @@ int toku_logger_create (TOKULOGGER *resultp) {
// ct is uninitialized on purpose // ct is uninitialized on purpose
result->lg_max = 100<<20; // 100MB default result->lg_max = 100<<20; // 100MB default
// lsn is uninitialized // lsn is uninitialized
toku_mutex_init(&result->txn_list_lock, NULL);
r = toku_omt_create(&result->live_txns); if (r!=0) goto panic;
r = toku_omt_create(&result->live_root_txns); if (r!=0) goto panic;
r = toku_omt_create(&result->snapshot_txnids); if (r!=0) goto panic;
r = toku_omt_create(&result->live_list_reverse); if (r!=0) goto panic;
result->inbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN}; result->inbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
result->outbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN}; result->outbuf = (struct logbuf) {0, LOGGER_MIN_BUF_SIZE, toku_xmalloc(LOGGER_MIN_BUF_SIZE), ZERO_LSN};
// written_lsn is uninitialized // written_lsn is uninitialized
...@@ -95,8 +90,6 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -95,8 +90,6 @@ int toku_logger_create (TOKULOGGER *resultp) {
// next_log_file_number is uninitialized // next_log_file_number is uninitialized
// n_in_file is uninitialized // n_in_file is uninitialized
result->write_block_size = FT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size result->write_block_size = FT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size
result->oldest_living_xid = TXNID_NONE_LIVING;
result->oldest_living_starttime = 0;
toku_logfilemgr_create(&result->logfilemgr); toku_logfilemgr_create(&result->logfilemgr);
*resultp=result; *resultp=result;
ml_init(&result->input_lock); ml_init(&result->input_lock);
...@@ -107,13 +100,8 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -107,13 +100,8 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->swap_ctr = 0; result->swap_ctr = 0;
result->rollback_cachefile = NULL; result->rollback_cachefile = NULL;
result->output_is_available = TRUE; result->output_is_available = TRUE;
toku_list_init(&result->prepared_txns); toku_txn_manager_init(&result->txn_manager);
toku_list_init(&result->prepared_and_returned_txns);
return 0; return 0;
panic:
toku_logger_panic(result, r);
return r;
} }
static int fsync_logdir(TOKULOGGER logger) { static int fsync_logdir(TOKULOGGER logger) {
...@@ -262,7 +250,7 @@ int toku_logger_close(TOKULOGGER *loggerp) { ...@@ -262,7 +250,7 @@ int toku_logger_close(TOKULOGGER *loggerp) {
if ( logger->write_log_files ) { if ( logger->write_log_files ) {
r = toku_file_fsync_without_accounting(logger->fd); if (r!=0) { r=errno; goto panic; } r = toku_file_fsync_without_accounting(logger->fd); if (r!=0) { r=errno; goto panic; }
} }
r = close(logger->fd); if (r!=0) { r=errno; goto panic; } r = close(logger->fd); if (r!=0) { r=errno; goto panic; }
} }
r = close_logdir(logger); if (r!=0) { r=errno; goto panic; } r = close_logdir(logger); if (r!=0) { r=errno; goto panic; }
logger->fd=-1; logger->fd=-1;
...@@ -276,11 +264,8 @@ int toku_logger_close(TOKULOGGER *loggerp) { ...@@ -276,11 +264,8 @@ int toku_logger_close(TOKULOGGER *loggerp) {
toku_mutex_destroy(&logger->output_condition_lock); toku_mutex_destroy(&logger->output_condition_lock);
toku_cond_destroy(&logger->output_condition); toku_cond_destroy(&logger->output_condition);
logger->is_panicked=TRUE; // Just in case this might help. logger->is_panicked=TRUE; // Just in case this might help.
toku_txn_manager_destroy(logger->txn_manager);
if (logger->directory) toku_free(logger->directory); if (logger->directory) toku_free(logger->directory);
toku_omt_destroy(&logger->live_txns);
toku_omt_destroy(&logger->live_root_txns);
toku_omt_destroy(&logger->snapshot_txnids);
toku_omt_destroy(&logger->live_list_reverse);
toku_logfilemgr_destroy(&logger->logfilemgr); toku_logfilemgr_destroy(&logger->logfilemgr);
toku_free(logger); toku_free(logger);
*loggerp=0; *loggerp=0;
...@@ -293,9 +278,8 @@ int toku_logger_close(TOKULOGGER *loggerp) { ...@@ -293,9 +278,8 @@ int toku_logger_close(TOKULOGGER *loggerp) {
int toku_logger_shutdown(TOKULOGGER logger) { int toku_logger_shutdown(TOKULOGGER logger) {
int r = 0; int r = 0;
if (logger->is_open) { if (logger->is_open) {
if (toku_omt_size(logger->live_txns) == 0) { if (toku_txn_manager_num_live_txns(logger->txn_manager) == 0) {
int r2 = toku_log_shutdown(logger, NULL, TRUE, 0); r = toku_log_shutdown(logger, NULL, TRUE, 0);
if (!r) r = r2;
} }
} }
return r; return r;
...@@ -1201,43 +1185,10 @@ TOKULOGGER toku_txn_logger (TOKUTXN txn) { ...@@ -1201,43 +1185,10 @@ TOKULOGGER toku_txn_logger (TOKUTXN txn) {
return txn ? txn->logger : 0; return txn ? txn->logger : 0;
} }
//Heaviside function to search through an OMT by a TXNID
static int
find_by_xid (OMTVALUE v, void *txnidv) {
TOKUTXN txn = v;
TXNID txnidfind = *(TXNID*)txnidv;
if (txn->txnid64<txnidfind) return -1;
if (txn->txnid64>txnidfind) return +1;
return 0;
}
BOOL is_txnid_live(TOKULOGGER logger, TXNID txnid) {
assert(logger);
TOKUTXN result = NULL;
int rval = toku_txnid2txn(logger, txnid, &result);
assert(rval == 0);
return (result != NULL);
}
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) { int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
if (logger==NULL) return -1; if (logger==NULL) return -1;
toku_txn_manager_id2txn(logger->txn_manager, txnid, result);
OMTVALUE txnfound; return 0;
int rval;
int r = toku_omt_find_zero(logger->live_txns, find_by_xid, &txnid, &txnfound, NULL);
if (r==0) {
TOKUTXN txn = txnfound;
assert(txn->txnid64==txnid);
*result = txn;
rval = 0;
}
else {
assert(r==DB_NOTFOUND);
// If there is no txn, then we treat it as the null txn.
*result = NULL;
rval = 0;
}
return rval;
} }
// Find the earliest LSN in a log. No locks are needed. // Find the earliest LSN in a log. No locks are needed.
...@@ -1343,16 +1294,6 @@ void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn) { ...@@ -1343,16 +1294,6 @@ void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn) {
logger->last_completed_checkpoint_lsn = lsn; logger->last_completed_checkpoint_lsn = lsn;
} }
TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger, time_t * oldest_living_starttime) {
TXNID rval = 0;
if (logger) {
rval = logger->oldest_living_xid;
if (oldest_living_starttime)
*oldest_living_starttime = logger->oldest_living_starttime;
}
return rval;
}
LSN LSN
toku_logger_get_next_lsn(TOKULOGGER logger) { toku_logger_get_next_lsn(TOKULOGGER logger) {
return logger->lsn; return logger->lsn;
...@@ -1470,4 +1411,8 @@ toku_get_version_of_logs_on_disk(const char *log_dir, BOOL *found_any_logs, uint ...@@ -1470,4 +1411,8 @@ toku_get_version_of_logs_on_disk(const char *log_dir, BOOL *found_any_logs, uint
return r; return r;
} }
TXN_MANAGER toku_logger_get_txn_manager(TOKULOGGER logger) {
return logger->txn_manager;
}
#undef STATUS_VALUE #undef STATUS_VALUE
...@@ -97,7 +97,6 @@ TXNID toku_txn_get_root_txnid (TOKUTXN txn); ...@@ -97,7 +97,6 @@ TXNID toku_txn_get_root_txnid (TOKUTXN txn);
LSN toku_logger_last_lsn(TOKULOGGER logger); LSN toku_logger_last_lsn(TOKULOGGER logger);
TOKULOGGER toku_txn_logger (TOKUTXN txn); TOKULOGGER toku_txn_logger (TOKUTXN txn);
BOOL is_txnid_live(TOKULOGGER logger, TXNID txnid);
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result); int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result);
//int toku_logger_log_checkpoint (TOKULOGGER); //int toku_logger_log_checkpoint (TOKULOGGER);
//int toku_set_func_fsync (int (*fsync_function)(int)); //int toku_set_func_fsync (int (*fsync_function)(int));
...@@ -106,7 +105,6 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags); ...@@ -106,7 +105,6 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
TOKUTXN toku_logger_txn_parent (TOKUTXN txn); TOKUTXN toku_logger_txn_parent (TOKUTXN txn);
void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn); void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn);
TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger, time_t * oldest_living_starttime);
LSN toku_logger_get_next_lsn(TOKULOGGER logger); LSN toku_logger_get_next_lsn(TOKULOGGER logger);
void toku_logger_set_remove_finalize_callback(TOKULOGGER logger, void (*funcp)(DICTIONARY_ID, void *), void * extra); void toku_logger_set_remove_finalize_callback(TOKULOGGER logger, void (*funcp)(DICTIONARY_ID, void *), void * extra);
void toku_logger_call_remove_finalize_callback(TOKULOGGER logger, DICTIONARY_ID dict_id); void toku_logger_call_remove_finalize_callback(TOKULOGGER logger, DICTIONARY_ID dict_id);
...@@ -191,6 +189,9 @@ void toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS s); ...@@ -191,6 +189,9 @@ void toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS s);
int toku_get_version_of_logs_on_disk(const char *log_dir, BOOL *found_any_logs, uint32_t *version_found); int toku_get_version_of_logs_on_disk(const char *log_dir, BOOL *found_any_logs, uint32_t *version_found);
int toku_delete_all_logs_of_version(const char *log_dir, uint32_t version_to_delete); int toku_delete_all_logs_of_version(const char *log_dir, uint32_t version_to_delete);
TXN_MANAGER toku_logger_get_txn_manager(TOKULOGGER logger);
static const TOKULOGGER NULL_logger __attribute__((__unused__)) = NULL; static const TOKULOGGER NULL_logger __attribute__((__unused__)) = NULL;
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
......
...@@ -1225,51 +1225,69 @@ int tokudb_needs_recovery(const char *log_dir, BOOL ignore_log_empty) { ...@@ -1225,51 +1225,69 @@ int tokudb_needs_recovery(const char *log_dir, BOOL ignore_log_empty) {
} }
static uint32_t recover_get_num_live_txns(RECOVER_ENV renv) { static uint32_t recover_get_num_live_txns(RECOVER_ENV renv) {
return toku_omt_size(renv->logger->live_txns); return toku_txn_manager_num_live_txns(renv->logger->txn_manager);
} }
static int
is_txn_unprepared (OMTVALUE txnv, u_int32_t UU(index), void* extra) {
TOKUTXN txn = txnv;
if (txn->state != TOKUTXN_PREPARING) {
*(TOKUTXN *)extra = txn;
return -1; // return -1 to get iterator to return
}
return 0;
}
static int find_an_unprepared_txn (RECOVER_ENV renv, TOKUTXN *txnp) { static int find_an_unprepared_txn (RECOVER_ENV renv, TOKUTXN *txnp) {
u_int32_t n_live_txns = toku_omt_size(renv->logger->live_txns); TOKUTXN txn = NULL;
for (u_int32_t i=0; i<n_live_txns; i++) { int r = toku_txn_manager_iter_over_live_txns(
OMTVALUE v; renv->logger->txn_manager,
int r = toku_omt_fetch(renv->logger->live_txns, n_live_txns-1-i, &v); is_txn_unprepared,
assert(r==0); &txn
TOKUTXN txn = (TOKUTXN) v; );
if (txn->state == TOKUTXN_PREPARING) assert(r == 0 || r == -1);
continue; if (txn != NULL) {
*txnp = txn; *txnp = txn;
return 0; return 0;
} }
return DB_NOTFOUND; return DB_NOTFOUND;
} }
static int
call_prepare_txn_callback_iter (OMTVALUE txnv, u_int32_t UU(index), void* extra) {
TOKUTXN txn = txnv;
RECOVER_ENV renv = extra;
renv->prepared_txn_callback(renv->env, txn);
return 0;
}
// abort all of the remaining live transactions in descending transaction id order // abort all of the remaining live transactions in descending transaction id order
static void recover_abort_live_txns(RECOVER_ENV renv) { static void recover_abort_live_txns(RECOVER_ENV renv) {
while (1) { while (1) {
TOKUTXN txn; TOKUTXN txn;
int r = find_an_unprepared_txn (renv, &txn); int r = find_an_unprepared_txn(renv, &txn);
if (r==0) { if (r==0) {
// abort the transaction // abort the transaction
r = toku_txn_abort_txn(txn, recover_yield, NULL, NULL, NULL); r = toku_txn_abort_txn(txn, recover_yield, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
} else if (r==DB_NOTFOUND) { } else if (r==DB_NOTFOUND) {
break; break;
} else { } else {
abort(); abort();
} }
} }
// Now we have only prepared txns. These prepared txns don't have full DB_TXNs in them, so we need to make some. // Now we have only prepared txns. These prepared txns don't have full DB_TXNs in them, so we need to make some.
for (u_int32_t i=0; i<toku_omt_size(renv->logger->live_txns); i++) { int r = toku_txn_manager_iter_over_live_txns(
OMTVALUE v; renv->logger->txn_manager,
int r = toku_omt_fetch(renv->logger->live_txns, i, &v); call_prepare_txn_callback_iter,
assert(r==0); renv
TOKUTXN txn = v; );
renv->prepared_txn_callback(renv->env, txn); assert_zero(r);
}
} }
static void recover_trace_le(const char *f, int l, int r, struct log_entry *le) { static void recover_trace_le(const char *f, int l, int r, struct log_entry *le) {
......
...@@ -171,8 +171,7 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key, ...@@ -171,8 +171,7 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key,
BOOL reset_root_xid_that_created) { BOOL reset_root_xid_that_created) {
CACHEFILE cf; CACHEFILE cf;
// 2954 - ignore messages for aborted hot-index // 2954 - ignore messages for aborted hot-index
int r = toku_txn_ignore_contains(txn, filenum); int r = 0;
if ( r != ENOENT ) goto done; // ENOENT => filenum not in ignore list
//printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data); //printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf); r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
if (r==ENOENT) { //Missing file on recovered transaction is not an error if (r==ENOENT) { //Missing file on recovered transaction is not an error
...@@ -510,21 +509,6 @@ toku_commit_hot_index (FILENUMS UU(hot_index_filenums), ...@@ -510,21 +509,6 @@ toku_commit_hot_index (FILENUMS UU(hot_index_filenums),
return 0; return 0;
} }
//2954
// function called by toku_omt_iterate to add hot_index filenums to
// each live txn's ignore list when a hot index is aborted
static int
live_txn_ignore(OMTVALUE vtxn, u_int32_t UU(idx) , void *vfn) {
TOKUTXN txn = vtxn;
FILENUMS *hot_index_filenums = vfn;
int r;
for (uint32_t i=0; i<hot_index_filenums->num;i++) {
r = toku_txn_ignore_add(txn, hot_index_filenums->filenums[i]);
invariant(r==0);
}
return 0;
}
int int
toku_rollback_hot_index (FILENUMS UU(hot_index_filenums), toku_rollback_hot_index (FILENUMS UU(hot_index_filenums),
TOKUTXN UU(txn), TOKUTXN UU(txn),
...@@ -532,8 +516,7 @@ toku_rollback_hot_index (FILENUMS UU(hot_index_filenums), ...@@ -532,8 +516,7 @@ toku_rollback_hot_index (FILENUMS UU(hot_index_filenums),
void * UU(yield_v), void * UU(yield_v),
LSN UU(oplsn)) LSN UU(oplsn))
{ {
int r = toku_omt_iterate(txn->logger->live_txns, live_txn_ignore, &hot_index_filenums); return 0;
return r;
} }
int int
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#include "test.h"
#include "includes.h"
#include "../fttypes.h"
#include "../txn.h"
/*
* a test of the txn filenums to ignore utilities:
* - toku_txn_ignore_create()
* - toku_txn_ignore_free()
* - toku_txn_ignore_add()
* - toku_txn_ignore_delete()
* - toku_txn_ignore_contains()
*/
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
TOKUTXN txn = (TOKUTXN) toku_malloc(sizeof(struct tokutxn));
int r;
toku_txn_ignore_init(txn);
FILENUM f1 = {1};
FILENUM f2 = {2};
FILENUM f3 = {3};
FILENUM f4 = {4};
FILENUM f5 = {5};
FILENUM f6 = {6};
FILENUM f7 = {7};
FILENUM f8 = {8};
FILENUM f9 = {9};
r = toku_txn_ignore_add(txn, f1); CKERR(r);
r = toku_txn_ignore_add(txn, f3); CKERR(r);
r = toku_txn_ignore_add(txn, f5); CKERR(r);
r = toku_txn_ignore_add(txn, f7); CKERR(r);
r = toku_txn_ignore_add(txn, f9); CKERR(r);
r = toku_txn_ignore_remove(txn, f3); CKERR(r);
r = toku_txn_ignore_remove(txn, f2); assert( r == ENOENT );
r = toku_txn_ignore_contains(txn, f1); CKERR(r);
r = toku_txn_ignore_contains(txn, f2); assert( r == ENOENT );
r = toku_txn_ignore_contains(txn, f3); assert( r == ENOENT );
r = toku_txn_ignore_contains(txn, f4); assert( r == ENOENT );
r = toku_txn_ignore_contains(txn, f5); CKERR(r);
r = toku_txn_ignore_contains(txn, f6); assert( r == ENOENT );
r = toku_txn_ignore_contains(txn, f7); CKERR(r);
r = toku_txn_ignore_contains(txn, f8); assert( r == ENOENT );
r = toku_txn_ignore_contains(txn, f9); CKERR(r);
assert(txn->ignore_errors.fns_allocated == 8);
assert(txn->ignore_errors.filenums.num == 4);
r = toku_txn_ignore_add(txn, f2); CKERR(r);
r = toku_txn_ignore_add(txn, f3); CKERR(r);
r = toku_txn_ignore_add(txn, f4); CKERR(r);
r = toku_txn_ignore_add(txn, f6); CKERR(r);
r = toku_txn_ignore_add(txn, f8); CKERR(r);
TXN_IGNORE txni = &(txn->ignore_errors); // test using code similar to that in txn.c
assert(txni->fns_allocated == 16);
assert(txni->filenums.num == 9);
// check that dups are ignored
for (int i=0;i<10;i++) {
r = toku_txn_ignore_add(txn, f2); CKERR(r);
}
assert(txn->ignore_errors.fns_allocated == 16);
assert(txn->ignore_errors.filenums.num == 9);
toku_txn_ignore_free(txn);
toku_free(txn);
return 0;
}
...@@ -10,10 +10,7 @@ ...@@ -10,10 +10,7 @@
#include "checkpoint.h" #include "checkpoint.h"
#include "ule.h" #include "ule.h"
#include <valgrind/helgrind.h> #include <valgrind/helgrind.h>
#include "txn_manager.h"
BOOL garbage_collection_debug = FALSE;
static void verify_snapshot_system(TOKULOGGER logger);
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
// Engine status // Engine status
...@@ -39,8 +36,6 @@ status_init(void) { ...@@ -39,8 +36,6 @@ status_init(void) {
STATUS_INIT(TXN_CLOSE, UINT64, "close (should be sum of aborts and commits)"); STATUS_INIT(TXN_CLOSE, UINT64, "close (should be sum of aborts and commits)");
STATUS_INIT(TXN_NUM_OPEN, UINT64, "number currently open (should be begin - close)"); STATUS_INIT(TXN_NUM_OPEN, UINT64, "number currently open (should be begin - close)");
STATUS_INIT(TXN_MAX_OPEN, UINT64, "max number open simultaneously"); STATUS_INIT(TXN_MAX_OPEN, UINT64, "max number open simultaneously");
STATUS_INIT(TXN_OLDEST_LIVE, UINT64, "xid of oldest live transaction");
STATUS_INIT(TXN_OLDEST_STARTTIME, UNIXTIME, "start time of oldest live transaction");
txn_status.initialized = true; txn_status.initialized = true;
} }
#undef STATUS_INIT #undef STATUS_INIT
...@@ -48,13 +43,9 @@ status_init(void) { ...@@ -48,13 +43,9 @@ status_init(void) {
#define STATUS_VALUE(x) txn_status.status[x].value.num #define STATUS_VALUE(x) txn_status.status[x].value.num
void void
toku_txn_get_status(TOKULOGGER logger, TXN_STATUS s) { toku_txn_get_status(TXN_STATUS s) {
if (!txn_status.initialized) if (!txn_status.initialized) {
status_init(); status_init();
{
time_t oldest_starttime;
STATUS_VALUE(TXN_OLDEST_LIVE) = toku_logger_get_oldest_living_xid(logger, &oldest_starttime);
STATUS_VALUE(TXN_OLDEST_STARTTIME) = (uint64_t) oldest_starttime;
} }
*s = txn_status; *s = txn_status;
} }
...@@ -83,8 +74,9 @@ toku_txn_begin_with_xid ( ...@@ -83,8 +74,9 @@ toku_txn_begin_with_xid (
) )
{ {
int r = toku_txn_create_txn(tokutxn, parent_tokutxn, logger, xid, snapshot_type, container_db_txn); int r = toku_txn_create_txn(tokutxn, parent_tokutxn, logger, xid, snapshot_type, container_db_txn);
if (r == 0) if (r == 0) {
toku_txn_start_txn(*tokutxn); toku_txn_manager_start_txn((*tokutxn)->logger->txn_manager, *tokutxn);
}
return r; return r;
} }
...@@ -98,77 +90,6 @@ void toku_txn_set_container_db_txn (TOKUTXN tokutxn, DB_TXN*container) { ...@@ -98,77 +90,6 @@ void toku_txn_set_container_db_txn (TOKUTXN tokutxn, DB_TXN*container) {
tokutxn->container_db_txn = container; tokutxn->container_db_txn = container;
} }
// Create list of root transactions that were live when this txn began.
static int
setup_live_root_txn_list(TOKUTXN txn) {
OMT global = txn->logger->live_root_txns;
int r = toku_omt_clone_noptr(
&txn->live_root_txn_list,
global
);
return r;
}
// Add this txn to the global list of txns that have their own snapshots.
// (Note, if a txn is a child that creates its own snapshot, then that child xid
// is the xid stored in the global list.)
static int
snapshot_txnids_note_txn(TOKUTXN txn) {
int r;
OMT txnids = txn->logger->snapshot_txnids;
r = toku_omt_insert_at(txnids, (OMTVALUE) txn->txnid64, toku_omt_size(txnids));
assert_zero(r);
return r;
}
// If live txn is not in reverse live list, then add it.
// If live txn is in reverse live list, update it by setting second xid in pair to new txn that is being started.
static int
live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
TOKUTXN txn = txnv;
TXNID xid = txn->txnid64; // xid of new txn that is being started
TXNID live_xid = (TXNID)live_xidv; // xid on the new txn's live list
OMTVALUE pairv;
XID_PAIR pair;
uint32_t idx;
int r;
OMT reverse = txn->logger->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, (void *)live_xid, &pairv, &idx);
if (r==0) {
pair = pairv;
invariant(pair->xid1 == live_xid); //sanity check
invariant(pair->xid2 < xid); //Must be older
pair->xid2 = txn->txnid64;
}
else {
invariant(r==DB_NOTFOUND);
//Make new entry
XMALLOC(pair);
pair->xid1 = live_xid;
pair->xid2 = txn->txnid64;
r = toku_omt_insert_at(reverse, pair, idx);
assert_zero(r);
}
return r;
}
// Maintain the reverse live list. The reverse live list is a list of xid pairs. The first xid in the pair
// is a txn that was live when some txn began, and the second xid in the pair is the newest still-live xid to
// have that first xid in its live list. (The first xid may be closed, it only needed to be live when the
// second txn began.)
// When a new txn begins, we need to scan the live list of this new txn. For each live txn, we either
// add it to the reverse live list (if it is not already there), or update to the reverse live list so
// that this new txn is the second xid in the pair associated with the txn in the live list.
static int
live_list_reverse_note_txn_start(TOKUTXN txn) {
int r;
r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_start_iter, txn);
assert_zero(r);
return r;
}
static void invalidate_xa_xid (TOKU_XA_XID *xid) { static void invalidate_xa_xid (TOKU_XA_XID *xid) {
ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind
xid->formatID = -1; // According to the XA spec, -1 means "invalid data" xid->formatID = -1; // According to the XA spec, -1 means "invalid data"
...@@ -185,9 +106,6 @@ toku_txn_create_txn ( ...@@ -185,9 +106,6 @@ toku_txn_create_txn (
) )
{ {
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
if (garbage_collection_debug) {
verify_snapshot_system(logger);
}
assert(logger->rollback_cachefile); assert(logger->rollback_cachefile);
TOKUTXN XMALLOC(result); TOKUTXN XMALLOC(result);
result->starttime = time(NULL); // getting timestamp in seconds is a cheap call result->starttime = time(NULL); // getting timestamp in seconds is a cheap call
...@@ -220,8 +138,6 @@ toku_txn_create_txn ( ...@@ -220,8 +138,6 @@ toku_txn_create_txn (
invalidate_xa_xid(&result->xa_xid); invalidate_xa_xid(&result->xa_xid);
result->do_fsync = FALSE; result->do_fsync = FALSE;
toku_txn_ignore_init(result); // 2954
result->txnid64 = xid; result->txnid64 = xid;
result->xids = NULL; result->xids = NULL;
...@@ -232,98 +148,9 @@ toku_txn_create_txn ( ...@@ -232,98 +148,9 @@ toku_txn_create_txn (
if (STATUS_VALUE(TXN_NUM_OPEN) > STATUS_VALUE(TXN_MAX_OPEN)) if (STATUS_VALUE(TXN_NUM_OPEN) > STATUS_VALUE(TXN_MAX_OPEN))
STATUS_VALUE(TXN_MAX_OPEN) = STATUS_VALUE(TXN_NUM_OPEN); STATUS_VALUE(TXN_MAX_OPEN) = STATUS_VALUE(TXN_NUM_OPEN);
if (garbage_collection_debug) {
verify_snapshot_system(logger);
}
return 0; return 0;
} }
void
toku_txn_start_txn(TOKUTXN txn) {
TOKULOGGER logger = txn->logger;
TOKUTXN parent = txn->parent;
int r;
if (txn->txnid64 == TXNID_NONE) {
LSN first_lsn;
r = toku_log_xbegin(logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
assert_zero(r);
txn->txnid64 = first_lsn.lsn;
}
XIDS parent_xids;
if (parent == NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent->xids;
r = xids_create_child(parent_xids, &txn->xids, txn->txnid64);
assert_zero(r);
if (toku_omt_size(logger->live_txns) == 0) {
assert(logger->oldest_living_xid == TXNID_NONE_LIVING);
logger->oldest_living_xid = txn->txnid64;
logger->oldest_living_starttime = txn->starttime;
}
assert(logger->oldest_living_xid <= txn->txnid64);
toku_mutex_lock(&logger->txn_list_lock);
{
//Add txn to list (omt) of live transactions
//We know it is the newest one.
r = toku_omt_insert_at(logger->live_txns, txn, toku_omt_size(logger->live_txns));
assert_zero(r);
//
// maintain the data structures necessary for MVCC:
// 1. add txn to list of live_root_txns if this is a root transaction
// 2. if the transaction is creating a snapshot:
// - create a live list for the transaction
// - add the id to the list of snapshot ids
// - make the necessary modifications to the live_list_reverse
//
// The order of operations is important here, and must be taken
// into account when the transaction is closed. The txn is added
// to the live_root_txns first (if it is a root txn). This has the implication
// that a root level snapshot transaction is in its own live list. This fact
// is taken into account when the transaction is closed.
//
// add ancestor information, and maintain global live root txn list
if (parent == NULL) {
//Add txn to list (omt) of live root txns
r = toku_omt_insert_at(logger->live_root_txns, (OMTVALUE) txn->txnid64, toku_omt_size(logger->live_root_txns)); //We know it is the newest one.
assert_zero(r);
txn->ancestor_txnid64 = txn->txnid64;
}
else {
txn->ancestor_txnid64 = parent->ancestor_txnid64;
}
// setup information for snapshot reads
if (txn->snapshot_type != TXN_SNAPSHOT_NONE) {
// in this case, either this is a root level transaction that needs its live list setup, or it
// is a child transaction that specifically asked for its own snapshot
if (parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) {
r = setup_live_root_txn_list(txn);
assert_zero(r);
txn->snapshot_txnid64 = txn->txnid64;
r = snapshot_txnids_note_txn(txn);
assert_zero(r);
r = live_list_reverse_note_txn_start(txn);
assert_zero(r);
}
// in this case, it is a child transaction that specified its snapshot to be that
// of the root transaction
else if (txn->snapshot_type == TXN_SNAPSHOT_ROOT) {
txn->live_root_txn_list = parent->live_root_txn_list;
txn->snapshot_txnid64 = parent->snapshot_txnid64;
}
else {
assert(FALSE);
}
}
}
toku_mutex_unlock(&logger->txn_list_lock);
}
//Used on recovery to recover a transaction. //Used on recovery to recover a transaction.
int int
toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) { toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
...@@ -391,14 +218,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv ...@@ -391,14 +218,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Among other things: if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // Effect: Among other things: if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
if (txn->state==TOKUTXN_PREPARING) { toku_txn_manager_note_commit_txn(txn->logger->txn_manager, txn);
invalidate_xa_xid(&txn->xa_xid);
toku_list_remove(&txn->prepared_txns_link);
}
txn->state = TOKUTXN_COMMITTING;
if (garbage_collection_debug) {
verify_snapshot_system(txn->logger);
}
int r; int r;
// panic handled in log_commit // panic handled in log_commit
...@@ -443,18 +263,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, ...@@ -443,18 +263,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
// Effect: Ammong other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken) // Effect: Ammong other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{ {
if (txn->state==TOKUTXN_PREPARING) { toku_txn_manager_note_abort_txn(txn->logger->txn_manager, txn);
invalidate_xa_xid(&txn->xa_xid);
toku_list_remove(&txn->prepared_txns_link);
}
txn->state = TOKUTXN_ABORTING;
if (garbage_collection_debug) {
verify_snapshot_system(txn->logger);
}
//printf("%s:%d aborting\n", __FILE__, __LINE__);
// Must undo everything. Must undo it all in reverse order.
// Build the reverse list
//printf("%s:%d abort\n", __FILE__, __LINE__);
txn->progress_poll_fun = poll; txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra; txn->progress_poll_fun_extra = poll_extra;
...@@ -477,14 +286,12 @@ static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) { ...@@ -477,14 +286,12 @@ static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) {
} }
int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) { int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) {
assert(txn->state==TOKUTXN_LIVE);
txn->state = TOKUTXN_PREPARING; // This state transition must be protected against begin_checkpoint. Right now it uses the ydb lock.
if (txn->parent) return 0; // nothing to do if there's a parent. if (txn->parent) return 0; // nothing to do if there's a parent.
toku_txn_manager_add_prepared_txn(txn->logger->txn_manager, txn);
// Do we need to do an fsync? // Do we need to do an fsync?
txn->do_fsync = (txn->force_fsync_on_commit || txn->num_rollentries>0); txn->do_fsync = (txn->force_fsync_on_commit || txn->num_rollentries>0);
copy_xid(&txn->xa_xid, xa_xid); copy_xid(&txn->xa_xid, xa_xid);
// This list will go away with #4683, so we wn't need the ydb lock for this anymore. // This list will go away with #4683, so we wn't need the ydb lock for this anymore.
toku_list_push(&txn->logger->prepared_txns, &txn->prepared_txns_link);
return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, xa_xid); return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, xa_xid);
} }
...@@ -492,53 +299,14 @@ void toku_txn_get_prepared_xa_xid (TOKUTXN txn, TOKU_XA_XID *xid) { ...@@ -492,53 +299,14 @@ void toku_txn_get_prepared_xa_xid (TOKUTXN txn, TOKU_XA_XID *xid) {
copy_xid(xid, &txn->xa_xid); copy_xid(xid, &txn->xa_xid);
} }
int toku_logger_get_txn_from_xid (TOKULOGGER logger, TOKU_XA_XID *xid, DB_TXN **txnp) {
int num_live_txns = toku_omt_size(logger->live_txns);
for (int i = 0; i < num_live_txns; i++) {
OMTVALUE v;
{
int r = toku_omt_fetch(logger->live_txns, i, &v);
assert_zero(r);
}
TOKUTXN txn = v;
if (txn->xa_xid.formatID == xid->formatID
&& txn->xa_xid.gtrid_length == xid->gtrid_length
&& txn->xa_xid.bqual_length == xid->bqual_length
&& 0==memcmp(txn->xa_xid.data, xid->data, xid->gtrid_length + xid->bqual_length)) {
*txnp = txn->container_db_txn;
return 0;
}
}
return DB_NOTFOUND;
}
int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) { int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
if (flags==DB_FIRST) { return toku_txn_manager_recover_txn(
// Anything in the returned list goes back on the prepared list. logger->txn_manager,
while (!toku_list_empty(&logger->prepared_and_returned_txns)) { preplist,
struct toku_list *h = toku_list_head(&logger->prepared_and_returned_txns); count,
toku_list_remove(h); retp,
toku_list_push(&logger->prepared_txns, h); flags
} );
} else if (flags!=DB_NEXT) {
return EINVAL;
}
long i;
for (i=0; i<count; i++) {
if (!toku_list_empty(&logger->prepared_txns)) {
struct toku_list *h = toku_list_head(&logger->prepared_txns);
toku_list_remove(h);
toku_list_push(&logger->prepared_and_returned_txns, h);
TOKUTXN txn = toku_list_struct(h, struct tokutxn, prepared_txns_link);
assert(txn->container_db_txn);
preplist[i].txn = txn->container_db_txn;
preplist[i].xid = txn->xa_xid;
} else {
break;
}
}
*retp = i;
return 0;
} }
struct txn_fsync_log_info { struct txn_fsync_log_info {
...@@ -572,83 +340,6 @@ void toku_txn_close_txn(TOKUTXN txn) { ...@@ -572,83 +340,6 @@ void toku_txn_close_txn(TOKUTXN txn) {
toku_txn_destroy_txn(txn); toku_txn_destroy_txn(txn);
} }
// For each xid on the closing txn's live list, find the corresponding entry in the reverse live list.
// There must be one.
// If the second xid in the pair is not the xid of the closing transaction, then the second xid must be newer
// than the closing txn, and there is nothing to be done (except to assert the invariant).
// If the second xid in the pair is the xid of the closing transaction, then we need to find the next oldest
// txn. If the live_xid is in the live list of the next oldest txn, then set the next oldest txn as the
// second xid in the pair, otherwise delete the entry from the reverse live list.
static int
live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
TOKUTXN txn = txnv;
TXNID xid = txn->txnid64; // xid of txn that is closing
TXNID live_xid = (TXNID)live_xidv; // xid on closing txn's live list
OMTVALUE pairv;
XID_PAIR pair;
uint32_t idx;
int r;
OMT reverse = txn->logger->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, (void *)live_xid, &pairv, &idx);
invariant(r==0);
pair = pairv;
invariant(pair->xid1 == live_xid); //sanity check
if (pair->xid2 == xid) {
//There is a record that needs to be either deleted or updated
TXNID olderxid;
OMTVALUE olderv;
uint32_t olderidx;
OMT snapshot = txn->logger->snapshot_txnids;
BOOL should_delete = TRUE;
// find the youngest txn in snapshot that is older than xid
r = toku_omt_find(snapshot, toku_find_xid_by_xid, (OMTVALUE) xid, -1, &olderv, &olderidx);
if (r==0) {
//There is an older txn
olderxid = (TXNID) olderv;
invariant(olderxid < xid);
if (olderxid >= live_xid) {
//older txn is new enough, we need to update.
pair->xid2 = olderxid;
should_delete = FALSE;
}
}
else {
invariant(r==DB_NOTFOUND);
}
if (should_delete) {
//Delete record
toku_free(pair);
r = toku_omt_delete_at(reverse, idx);
invariant(r==0);
}
}
else {
invariant(pair->xid2 > xid);
}
return r;
}
// When txn ends, update reverse live list. To do that, examine each txn in this (closing) txn's live list.
static int
live_list_reverse_note_txn_end(TOKUTXN txn) {
int r;
r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_end_iter, txn);
invariant(r==0);
return r;
}
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
static int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1;
if (txn->txnid64>txnfind->txnid64) return +1;
return 0;
}
static int remove_txn (OMTVALUE hv, u_int32_t UU(idx), void *txnv) static int remove_txn (OMTVALUE hv, u_int32_t UU(idx), void *txnv)
// Effect: This function is called on every open BRT that a transaction used. // Effect: This function is called on every open BRT that a transaction used.
// This function removes the transaction from that BRT. // This function removes the transaction from that BRT.
...@@ -676,91 +367,17 @@ void toku_txn_complete_txn(TOKUTXN txn) { ...@@ -676,91 +367,17 @@ void toku_txn_complete_txn(TOKUTXN txn) {
assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b); assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b);
assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b); assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
assert(txn->current_rollback.b == ROLLBACK_NONE.b); assert(txn->current_rollback.b == ROLLBACK_NONE.b);
int r; toku_txn_manager_finish_txn(txn->logger->txn_manager, txn);
TOKULOGGER logger = txn->logger; // note that here is another place we depend on
toku_mutex_lock(&logger->txn_list_lock); // this function being called with the multi operation lock
{
{
//Remove txn from list (omt) of live transactions
OMTVALUE txnagain;
u_int32_t idx;
r = toku_omt_find_zero(logger->live_txns, find_xid, txn, &txnagain, &idx);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(logger->live_txns, idx);
assert(r==0);
}
if (txn->parent==NULL) {
OMTVALUE v;
u_int32_t idx;
//Remove txn from list of live root txns
r = toku_omt_find_zero(logger->live_root_txns, toku_find_xid_by_xid, (OMTVALUE)txn->txnid64, &v, &idx);
assert(r==0);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(logger->live_root_txns, idx);
assert(r==0);
}
//
// if this txn created a snapshot, make necessary modifications to list of snapshot txnids and live_list_reverse
// the order of operations is important. We first remove the txnid from the list of snapshot txnids. This is
// necessary because root snapshot transactions are in their own live lists. If we do not remove
// the txnid from the snapshot txnid list first, then when we go to make the modifications to
// live_list_reverse, we have trouble. We end up never removing (id, id) from live_list_reverse
//
if (txn->snapshot_type != TXN_SNAPSHOT_NONE && (txn->parent==NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD)) {
{
u_int32_t idx;
OMTVALUE v;
//Free memory used for snapshot_txnids
r = toku_omt_find_zero(logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
invariant(r==0);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(logger->snapshot_txnids, idx);
invariant(r==0);
}
live_list_reverse_note_txn_end(txn);
{
//Free memory used for live root txns local list
invariant(toku_omt_size(txn->live_root_txn_list) > 0);
toku_omt_destroy(&txn->live_root_txn_list);
}
}
}
toku_mutex_unlock(&logger->txn_list_lock);
assert(logger->oldest_living_xid <= txn->txnid64);
if (txn->txnid64 == logger->oldest_living_xid) {
OMTVALUE oldest_txnv;
r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv);
if (r==0) {
TOKUTXN oldest_txn = oldest_txnv;
assert(oldest_txn != txn); // We just removed it
assert(oldest_txn->txnid64 > logger->oldest_living_xid); //Must be newer than the previous oldest
logger->oldest_living_xid = oldest_txn->txnid64;
logger->oldest_living_starttime = oldest_txn->starttime;
}
else {
//No living transactions
assert(r==EINVAL);
logger->oldest_living_xid = TXNID_NONE_LIVING;
logger->oldest_living_starttime = 0;
}
}
note_txn_closing(txn); note_txn_closing(txn);
} }
void toku_txn_destroy_txn(TOKUTXN txn) { void toku_txn_destroy_txn(TOKUTXN txn) {
if (garbage_collection_debug) if (txn->open_fts) {
verify_snapshot_system(txn->logger);
if (txn->open_fts)
toku_omt_destroy(&txn->open_fts); toku_omt_destroy(&txn->open_fts);
}
xids_destroy(&txn->xids); xids_destroy(&txn->xids);
toku_txn_ignore_free(txn); // 2954
toku_free(txn); toku_free(txn);
STATUS_VALUE(TXN_CLOSE)++; STATUS_VALUE(TXN_CLOSE)++;
...@@ -803,209 +420,6 @@ BOOL toku_is_txn_in_live_root_txn_list(OMT live_root_txn_list, TXNID xid) { ...@@ -803,209 +420,6 @@ BOOL toku_is_txn_in_live_root_txn_list(OMT live_root_txn_list, TXNID xid) {
return retval; return retval;
} }
static void
verify_snapshot_system(TOKULOGGER logger) {
int num_snapshot_txnids = toku_omt_size(logger->snapshot_txnids);
TXNID snapshot_txnids[num_snapshot_txnids];
int num_live_txns = toku_omt_size(logger->live_txns);
TOKUTXN live_txns[num_live_txns];
int num_live_list_reverse = toku_omt_size(logger->live_list_reverse);
XID_PAIR live_list_reverse[num_live_list_reverse];
int r;
int i;
int j;
//set up arrays for easier access
for (i = 0; i < num_snapshot_txnids; i++) {
OMTVALUE v;
r = toku_omt_fetch(logger->snapshot_txnids, i, &v);
assert_zero(r);
snapshot_txnids[i] = (TXNID) v;
}
for (i = 0; i < num_live_txns; i++) {
OMTVALUE v;
r = toku_omt_fetch(logger->live_txns, i, &v);
assert_zero(r);
live_txns[i] = v;
}
for (i = 0; i < num_live_list_reverse; i++) {
OMTVALUE v;
r = toku_omt_fetch(logger->live_list_reverse, i, &v);
assert_zero(r);
live_list_reverse[i] = v;
}
{
//Verify snapshot_txnids
for (i = 0; i < num_snapshot_txnids; i++) {
TXNID snapshot_xid = snapshot_txnids[i];
invariant(is_txnid_live(logger, snapshot_xid));
TOKUTXN snapshot_txn;
r = toku_txnid2txn(logger, snapshot_xid, &snapshot_txn);
assert_zero(r);
int num_live_root_txn_list = toku_omt_size(snapshot_txn->live_root_txn_list);
TXNID live_root_txn_list[num_live_root_txn_list];
{
for (j = 0; j < num_live_root_txn_list; j++) {
OMTVALUE v;
r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v);
assert_zero(r);
live_root_txn_list[j] = (TXNID)v;
}
}
for (j = 0; j < num_live_root_txn_list; j++) {
TXNID live_xid = live_root_txn_list[j];
invariant(live_xid <= snapshot_xid);
TXNID youngest = toku_get_youngest_live_list_txnid_for(
live_xid,
logger->live_list_reverse
);
invariant(youngest!=TXNID_NONE);
invariant(youngest>=snapshot_xid);
}
}
}
{
//Verify live_list_reverse
for (i = 0; i < num_live_list_reverse; i++) {
XID_PAIR pair = live_list_reverse[i];
invariant(pair->xid1 <= pair->xid2);
{
//verify pair->xid2 is in snapshot_xids
u_int32_t index;
OMTVALUE v2;
r = toku_omt_find_zero(logger->snapshot_txnids,
toku_find_xid_by_xid,
(OMTVALUE) pair->xid2, &v2, &index);
assert_zero(r);
}
for (j = 0; j < num_live_txns; j++) {
TOKUTXN txn = live_txns[j];
if (txn->snapshot_type != TXN_SNAPSHOT_NONE) {
BOOL expect = txn->snapshot_txnid64 >= pair->xid1 &&
txn->snapshot_txnid64 <= pair->xid2;
BOOL found = toku_is_txn_in_live_root_txn_list(txn->live_root_txn_list, pair->xid1);
invariant((expect==FALSE) == (found==FALSE));
}
}
}
}
{
//Verify live_txns
for (i = 0; i < num_live_txns; i++) {
TOKUTXN txn = live_txns[i];
BOOL expect = txn->snapshot_txnid64 == txn->txnid64;
{
//verify pair->xid2 is in snapshot_xids
u_int32_t index;
OMTVALUE v2;
r = toku_omt_find_zero(logger->snapshot_txnids,
toku_find_xid_by_xid,
(OMTVALUE) txn->txnid64, &v2, &index);
invariant(r==0 || r==DB_NOTFOUND);
invariant((r==0) == (expect!=0));
}
}
}
}
// routines for checking if rollback errors should be ignored because a hot index create was aborted
// 2954
// returns
// 0 on success
// ENOMEM if can't alloc memory
// EINVAL if txn = NULL
// -1 on other errors
void toku_txn_ignore_init(TOKUTXN txn) {
assert(txn);
TXN_IGNORE txni = &(txn->ignore_errors);
txni->fns_allocated = 0;
txni->filenums.num = 0;
txni->filenums.filenums = NULL;
}
void toku_txn_ignore_free(TOKUTXN txn) {
assert(txn);
TXN_IGNORE txni = &(txn->ignore_errors);
toku_free(txni->filenums.filenums);
txni->filenums.num = 0;
txni->filenums.filenums = NULL;
}
// returns
// 0 on success
// ENOMEM if can't alloc memory
// EINVAL if txn = NULL
// -1 on other errors
int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum) {
assert(txn);
// check for dups
if ( toku_txn_ignore_contains(txn, filenum) == 0 ) return 0;
// alloc more space if needed
const int N = 2;
TXN_IGNORE txni = &(txn->ignore_errors);
if ( txni->filenums.num == txni->fns_allocated ) {
if ( txni->fns_allocated == 0 ) {
CALLOC_N(N, txni->filenums.filenums);
if ( txni->filenums.filenums == NULL ) return ENOMEM;
txni->fns_allocated = N;
}
else {
XREALLOC_N(txni->fns_allocated * N, txni->filenums.filenums);
txni->fns_allocated = txni->fns_allocated * N;
}
}
txni->filenums.num++;
txni->filenums.filenums[txni->filenums.num - 1].fileid = filenum.fileid;
return 0;
}
// returns
// 0 on success
// ENOENT if not found
// EINVAL if txn = NULL
// -1 on other errors
// THIS FUNCTION IS NOT USED IN FUNCTIONAL CODE, BUT IS USEFUL FOR TESTING
int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum) {
assert(txn);
TXN_IGNORE txni = &(txn->ignore_errors);
int found_fn = 0;
if ( txni->filenums.num == 0 ) return ENOENT;
for(uint32_t i=0; i<txni->filenums.num; i++) {
if ( !found_fn ) {
if ( txni->filenums.filenums[i].fileid == filenum.fileid ) {
found_fn = 1;
}
}
else { // remove bubble in array
txni->filenums.filenums[i-1].fileid = txni->filenums.filenums[i].fileid;
}
}
if ( !found_fn ) return ENOENT;
txni->filenums.num--;
return 0;
}
// returns
// 0 on success
// ENOENT if not found
// EINVAL if txn = NULL
// -1 on other errors
int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum) {
assert(txn);
TXN_IGNORE txni = &(txn->ignore_errors);
for(uint32_t i=0; i<txni->filenums.num; i++) {
if ( txni->filenums.filenums[i].fileid == filenum.fileid ) {
return 0;
}
}
return ENOENT;
}
TOKUTXN_STATE TOKUTXN_STATE
toku_txn_get_state(TOKUTXN txn) { toku_txn_get_state(TOKUTXN txn) {
return txn->state; return txn->state;
......
...@@ -35,9 +35,6 @@ int toku_txn_begin_with_xid ( ...@@ -35,9 +35,6 @@ int toku_txn_begin_with_xid (
// Allocate and initialize a txn // Allocate and initialize a txn
int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXNID xid, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn); int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXNID xid, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn);
// Assign a txnid. Log the txn begin in the recovery log. Initialize the txn live lists.
void toku_txn_start_txn(TOKUTXN txn);
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info); int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
...@@ -95,7 +92,7 @@ typedef struct { ...@@ -95,7 +92,7 @@ typedef struct {
TOKU_ENGINE_STATUS_ROW_S status[TXN_STATUS_NUM_ROWS]; TOKU_ENGINE_STATUS_ROW_S status[TXN_STATUS_NUM_ROWS];
} TXN_STATUS_S, *TXN_STATUS; } TXN_STATUS_S, *TXN_STATUS;
void toku_txn_get_status(TOKULOGGER logger, TXN_STATUS s); void toku_txn_get_status(TXN_STATUS s);
BOOL toku_is_txn_in_live_root_txn_list(OMT live_root_txn_list, TXNID xid); BOOL toku_is_txn_in_live_root_txn_list(OMT live_root_txn_list, TXNID xid);
...@@ -106,18 +103,6 @@ typedef struct { ...@@ -106,18 +103,6 @@ typedef struct {
TXNID xid2; TXNID xid2;
} XID_PAIR_S, *XID_PAIR; } XID_PAIR_S, *XID_PAIR;
// 2954
typedef struct tokutxn_filenum_ignore_errors {
uint32_t fns_allocated;
FILENUMS filenums;
} TXN_IGNORE_S, *TXN_IGNORE;
void toku_txn_ignore_init(TOKUTXN txn);
void toku_txn_ignore_free(TOKUTXN txn);
int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum);
int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum);
int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum);
#include "txn_state.h" #include "txn_state.h"
TOKUTXN_STATE toku_txn_get_state(TOKUTXN txn); TOKUTXN_STATE toku_txn_get_state(TOKUTXN txn);
...@@ -127,7 +112,6 @@ struct tokulogger_preplist { ...@@ -127,7 +112,6 @@ struct tokulogger_preplist {
DB_TXN *txn; DB_TXN *txn;
}; };
int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags); int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags);
int toku_logger_get_txn_from_xid (TOKULOGGER logger, TOKU_XA_XID *xid, DB_TXN **txnp);
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
} }
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
#include "txn.h"
#include "checkpoint.h"
#include "ule.h"
#include <valgrind/helgrind.h>
#include "txn_manager.h"
struct txn_manager {
toku_mutex_t txn_manager_lock; // a lock protecting live_list_reverse and snapshot_txnids for now TODO: revisit this decision
OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that?
OMT live_root_txns; // a sorted tree.
OMT snapshot_txnids; //contains TXNID x | x is snapshot txn
//contains TXNID pairs (x,y) | y is oldest txnid s.t. x is in y's live list
// every TXNID that is in some snapshot's live list is used as the key for this OMT, x, as described above.
// The second half of the pair, y, is the youngest snapshot txnid (that is, has the highest LSN), such that x is in its live list.
// So, for example, Say T_800 begins, T_800 commits right after snapshot txn T_1100 begins. Then (800,1100) is in
// this list
OMT live_list_reverse;
TXNID oldest_living_xid;
time_t oldest_living_starttime; // timestamp in seconds of when txn with oldest_living_xid started
struct toku_list prepared_txns; // transactions that have been prepared and are unresolved, but have not been returned through txn_recover.
struct toku_list prepared_and_returned_txns; // transactions that have been prepared and unresolved, and have been returned through txn_recover. We need this list so that we can restart the recovery.
};
static TXN_STATUS_S txn_manager_status;
BOOL garbage_collection_debug = FALSE;
#define STATUS_INIT(k,t,l) { \
txn_manager_status.status[k].keyname = #k; \
txn_manager_status.status[k].type = t; \
txn_manager_status.status[k].legend = "txn: " l; \
}
static void
status_init(void) {
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
STATUS_INIT(TXN_OLDEST_LIVE, UINT64, "xid of oldest live transaction");
STATUS_INIT(TXN_OLDEST_STARTTIME, UNIXTIME, "start time of oldest live transaction");
txn_manager_status.initialized = true;
}
#undef STATUS_INIT
#define STATUS_VALUE(x) txn_manager_status.status[x].value.num
static BOOL is_txnid_live(TXN_MANAGER txn_manager, TXNID txnid) {
TOKUTXN result = NULL;
toku_txn_manager_id2txn_unlocked(txn_manager, txnid, &result);
return (result != NULL);
}
static void
verify_snapshot_system(TXN_MANAGER txn_manager) {
int num_snapshot_txnids = toku_omt_size(txn_manager->snapshot_txnids);
TXNID snapshot_txnids[num_snapshot_txnids];
int num_live_txns = toku_omt_size(txn_manager->live_txns);
TOKUTXN live_txns[num_live_txns];
int num_live_list_reverse = toku_omt_size(txn_manager->live_list_reverse);
XID_PAIR live_list_reverse[num_live_list_reverse];
int r;
int i;
int j;
//set up arrays for easier access
for (i = 0; i < num_snapshot_txnids; i++) {
OMTVALUE v;
r = toku_omt_fetch(txn_manager->snapshot_txnids, i, &v);
assert_zero(r);
snapshot_txnids[i] = (TXNID) v;
}
for (i = 0; i < num_live_txns; i++) {
OMTVALUE v;
r = toku_omt_fetch(txn_manager->live_txns, i, &v);
assert_zero(r);
live_txns[i] = v;
}
for (i = 0; i < num_live_list_reverse; i++) {
OMTVALUE v;
r = toku_omt_fetch(txn_manager->live_list_reverse, i, &v);
assert_zero(r);
live_list_reverse[i] = v;
}
{
//Verify snapshot_txnids
for (i = 0; i < num_snapshot_txnids; i++) {
TXNID snapshot_xid = snapshot_txnids[i];
invariant(is_txnid_live(txn_manager, snapshot_xid));
TOKUTXN snapshot_txn;
toku_txn_manager_id2txn_unlocked(txn_manager, snapshot_xid, &snapshot_txn);
int num_live_root_txn_list = toku_omt_size(snapshot_txn->live_root_txn_list);
TXNID live_root_txn_list[num_live_root_txn_list];
{
for (j = 0; j < num_live_root_txn_list; j++) {
OMTVALUE v;
r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v);
assert_zero(r);
live_root_txn_list[j] = (TXNID)v;
}
}
for (j = 0; j < num_live_root_txn_list; j++) {
TXNID live_xid = live_root_txn_list[j];
invariant(live_xid <= snapshot_xid);
TXNID youngest = toku_get_youngest_live_list_txnid_for(
live_xid,
txn_manager->live_list_reverse
);
invariant(youngest!=TXNID_NONE);
invariant(youngest>=snapshot_xid);
}
}
}
{
//Verify live_list_reverse
for (i = 0; i < num_live_list_reverse; i++) {
XID_PAIR pair = live_list_reverse[i];
invariant(pair->xid1 <= pair->xid2);
{
//verify pair->xid2 is in snapshot_xids
u_int32_t index;
OMTVALUE v2;
r = toku_omt_find_zero(txn_manager->snapshot_txnids,
toku_find_xid_by_xid,
(OMTVALUE) pair->xid2, &v2, &index);
assert_zero(r);
}
for (j = 0; j < num_live_txns; j++) {
TOKUTXN txn = live_txns[j];
if (txn->snapshot_type != TXN_SNAPSHOT_NONE) {
BOOL expect = txn->snapshot_txnid64 >= pair->xid1 &&
txn->snapshot_txnid64 <= pair->xid2;
BOOL found = toku_is_txn_in_live_root_txn_list(txn->live_root_txn_list, pair->xid1);
invariant((expect==FALSE) == (found==FALSE));
}
}
}
}
{
//Verify live_txns
for (i = 0; i < num_live_txns; i++) {
TOKUTXN txn = live_txns[i];
BOOL expect = txn->snapshot_txnid64 == txn->txnid64;
{
//verify pair->xid2 is in snapshot_xids
u_int32_t index;
OMTVALUE v2;
r = toku_omt_find_zero(txn_manager->snapshot_txnids,
toku_find_xid_by_xid,
(OMTVALUE) txn->txnid64, &v2, &index);
invariant(r==0 || r==DB_NOTFOUND);
invariant((r==0) == (expect!=0));
}
}
}
}
static TXNID txn_manager_get_oldest_living_xid_unlocked(
TXN_MANAGER txn_manager,
time_t * oldest_living_starttime
);
void toku_txn_manager_get_status(TOKULOGGER logger, TXN_STATUS s) {
if (!txn_manager_status.initialized) {
status_init();
}
{
if (logger) {
time_t oldest_starttime;
STATUS_VALUE(TXN_OLDEST_LIVE) = txn_manager_get_oldest_living_xid_unlocked(logger->txn_manager, &oldest_starttime);
STATUS_VALUE(TXN_OLDEST_STARTTIME) = (uint64_t) oldest_starttime;
}
}
*s = txn_manager_status;
}
void toku_txn_manager_init(TXN_MANAGER* txn_managerp) {
int r = 0;
TXN_MANAGER XMALLOC(txn_manager);
toku_mutex_init(&txn_manager->txn_manager_lock, NULL);
r = toku_omt_create(&txn_manager->live_txns);
assert_zero(r);
r = toku_omt_create(&txn_manager->live_root_txns);
assert_zero(r);
r = toku_omt_create(&txn_manager->snapshot_txnids);
assert_zero(r);
r = toku_omt_create(&txn_manager->live_list_reverse);
assert_zero(r);
txn_manager->oldest_living_xid = TXNID_NONE_LIVING;
txn_manager->oldest_living_starttime = 0;
toku_list_init(&txn_manager->prepared_txns);
toku_list_init(&txn_manager->prepared_and_returned_txns);
*txn_managerp = txn_manager;
}
void toku_txn_manager_destroy(TXN_MANAGER txn_manager) {
toku_mutex_destroy(&txn_manager->txn_manager_lock);
toku_omt_destroy(&txn_manager->live_txns);
toku_omt_destroy(&txn_manager->live_root_txns);
toku_omt_destroy(&txn_manager->snapshot_txnids);
toku_omt_destroy(&txn_manager->live_list_reverse);
toku_free(txn_manager);
}
static TXNID txn_manager_get_oldest_living_xid_unlocked(
TXN_MANAGER txn_manager,
time_t * oldest_living_starttime
)
{
TXNID rval = 0;
rval = txn_manager->oldest_living_xid;
if (oldest_living_starttime) {
*oldest_living_starttime = txn_manager->oldest_living_starttime;
}
return rval;
}
TXNID toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager, time_t * oldest_living_starttime) {
TXNID rval = 0;
toku_mutex_lock(&txn_manager->txn_manager_lock);
rval = txn_manager_get_oldest_living_xid_unlocked(txn_manager, oldest_living_starttime);
toku_mutex_unlock(&txn_manager->txn_manager_lock);
return rval;
}
// Create list of root transactions that were live when this txn began.
static int
setup_live_root_txn_list(TXN_MANAGER txn_manager, TOKUTXN txn) {
OMT global = txn_manager->live_root_txns;
int r = toku_omt_clone_noptr(
&txn->live_root_txn_list,
global
);
return r;
}
// Add this txn to the global list of txns that have their own snapshots.
// (Note, if a txn is a child that creates its own snapshot, then that child xid
// is the xid stored in the global list.)
static int
snapshot_txnids_note_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
int r;
OMT txnids = txn_manager->snapshot_txnids;
r = toku_omt_insert_at(txnids, (OMTVALUE) txn->txnid64, toku_omt_size(txnids));
assert_zero(r);
return r;
}
// If live txn is not in reverse live list, then add it.
// If live txn is in reverse live list, update it by setting second xid in pair to new txn that is being started.
static int
live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
TOKUTXN txn = txnv;
TXNID xid = txn->txnid64; // xid of new txn that is being started
TXNID live_xid = (TXNID)live_xidv; // xid on the new txn's live list
OMTVALUE pairv;
XID_PAIR pair;
uint32_t idx;
int r;
OMT reverse = txn->logger->txn_manager->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, (void *)live_xid, &pairv, &idx);
if (r==0) {
pair = pairv;
invariant(pair->xid1 == live_xid); //sanity check
invariant(pair->xid2 < xid); //Must be older
pair->xid2 = txn->txnid64;
}
else {
invariant(r==DB_NOTFOUND);
//Make new entry
XMALLOC(pair);
pair->xid1 = live_xid;
pair->xid2 = txn->txnid64;
r = toku_omt_insert_at(reverse, pair, idx);
assert_zero(r);
}
return r;
}
// Maintain the reverse live list. The reverse live list is a list of xid pairs. The first xid in the pair
// is a txn that was live when some txn began, and the second xid in the pair is the newest still-live xid to
// have that first xid in its live list. (The first xid may be closed, it only needed to be live when the
// second txn began.)
// When a new txn begins, we need to scan the live list of this new txn. For each live txn, we either
// add it to the reverse live list (if it is not already there), or update to the reverse live list so
// that this new txn is the second xid in the pair associated with the txn in the live list.
static int
live_list_reverse_note_txn_start(TOKUTXN txn) {
int r;
r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_start_iter, txn);
assert_zero(r);
return r;
}
void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
TOKUTXN parent = txn->parent;
int r;
// we take the txn_manager_lock before writing to the log
// we may be able to move this lock acquisition
// down to just before inserting into logger->live_txns
// if we know the caller has the multi operation lock
toku_mutex_lock(&txn_manager->txn_manager_lock);
if (garbage_collection_debug) {
verify_snapshot_system(txn_manager);
}
if (txn->txnid64 == TXNID_NONE) {
LSN first_lsn;
r = toku_log_xbegin(txn->logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
assert_zero(r);
txn->txnid64 = first_lsn.lsn;
}
XIDS parent_xids;
if (parent == NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent->xids;
r = xids_create_child(parent_xids, &txn->xids, txn->txnid64);
assert_zero(r);
if (toku_omt_size(txn_manager->live_txns) == 0) {
assert(txn_manager->oldest_living_xid == TXNID_NONE_LIVING);
txn_manager->oldest_living_xid = txn->txnid64;
txn_manager->oldest_living_starttime = txn->starttime;
}
assert(txn_manager->oldest_living_xid <= txn->txnid64);
{
//Add txn to list (omt) of live transactions
//We know it is the newest one.
r = toku_omt_insert_at(txn_manager->live_txns, txn, toku_omt_size(txn_manager->live_txns));
assert_zero(r);
//
// maintain the data structures necessary for MVCC:
// 1. add txn to list of live_root_txns if this is a root transaction
// 2. if the transaction is creating a snapshot:
// - create a live list for the transaction
// - add the id to the list of snapshot ids
// - make the necessary modifications to the live_list_reverse
//
// The order of operations is important here, and must be taken
// into account when the transaction is closed. The txn is added
// to the live_root_txns first (if it is a root txn). This has the implication
// that a root level snapshot transaction is in its own live list. This fact
// is taken into account when the transaction is closed.
//
// add ancestor information, and maintain global live root txn list
if (parent == NULL) {
//Add txn to list (omt) of live root txns
r = toku_omt_insert_at(
txn_manager->live_root_txns,
(OMTVALUE) txn->txnid64,
toku_omt_size(txn_manager->live_root_txns)
); //We know it is the newest one.
assert_zero(r);
txn->ancestor_txnid64 = txn->txnid64;
}
else {
txn->ancestor_txnid64 = parent->ancestor_txnid64;
}
// setup information for snapshot reads
if (txn->snapshot_type != TXN_SNAPSHOT_NONE) {
// in this case, either this is a root level transaction that needs its live list setup, or it
// is a child transaction that specifically asked for its own snapshot
if (parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) {
r = setup_live_root_txn_list(txn_manager, txn);
assert_zero(r);
txn->snapshot_txnid64 = txn->txnid64;
r = snapshot_txnids_note_txn(txn_manager, txn);
assert_zero(r);
r = live_list_reverse_note_txn_start(txn);
assert_zero(r);
}
// in this case, it is a child transaction that specified its snapshot to be that
// of the root transaction
else if (txn->snapshot_type == TXN_SNAPSHOT_ROOT) {
txn->live_root_txn_list = parent->live_root_txn_list;
txn->snapshot_txnid64 = parent->snapshot_txnid64;
}
else {
assert(FALSE);
}
}
}
if (garbage_collection_debug) {
verify_snapshot_system(txn_manager);
}
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
// For each xid on the closing txn's live list, find the corresponding entry in the reverse live list.
// There must be one.
// If the second xid in the pair is not the xid of the closing transaction, then the second xid must be newer
// than the closing txn, and there is nothing to be done (except to assert the invariant).
// If the second xid in the pair is the xid of the closing transaction, then we need to find the next oldest
// txn. If the live_xid is in the live list of the next oldest txn, then set the next oldest txn as the
// second xid in the pair, otherwise delete the entry from the reverse live list.
static int
live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
TOKUTXN txn = txnv;
TXNID xid = txn->txnid64; // xid of txn that is closing
TXNID live_xid = (TXNID)live_xidv; // xid on closing txn's live list
OMTVALUE pairv;
XID_PAIR pair;
uint32_t idx;
int r;
OMT reverse = txn->logger->txn_manager->live_list_reverse;
r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, (void *)live_xid, &pairv, &idx);
invariant(r==0);
pair = pairv;
invariant(pair->xid1 == live_xid); //sanity check
if (pair->xid2 == xid) {
//There is a record that needs to be either deleted or updated
TXNID olderxid;
OMTVALUE olderv;
uint32_t olderidx;
OMT snapshot = txn->logger->txn_manager->snapshot_txnids;
BOOL should_delete = TRUE;
// find the youngest txn in snapshot that is older than xid
r = toku_omt_find(snapshot, toku_find_xid_by_xid, (OMTVALUE) xid, -1, &olderv, &olderidx);
if (r==0) {
//There is an older txn
olderxid = (TXNID) olderv;
invariant(olderxid < xid);
if (olderxid >= live_xid) {
//older txn is new enough, we need to update.
pair->xid2 = olderxid;
should_delete = FALSE;
}
}
else {
invariant(r==DB_NOTFOUND);
}
if (should_delete) {
//Delete record
toku_free(pair);
r = toku_omt_delete_at(reverse, idx);
invariant(r==0);
}
}
else {
invariant(pair->xid2 > xid);
}
return r;
}
// When txn ends, update reverse live list. To do that, examine each txn in this (closing) txn's live list.
static int
live_list_reverse_note_txn_end(TOKUTXN txn) {
int r;
r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_end_iter, txn);
invariant(r==0);
return r;
}
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
static int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1;
if (txn->txnid64>txnfind->txnid64) return +1;
return 0;
}
void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
int r;
toku_mutex_lock(&txn_manager->txn_manager_lock);
{
if (garbage_collection_debug) {
verify_snapshot_system(txn_manager);
}
{
//Remove txn from list (omt) of live transactions
OMTVALUE txnagain;
u_int32_t idx;
r = toku_omt_find_zero(txn_manager->live_txns, find_xid, txn, &txnagain, &idx);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(txn_manager->live_txns, idx);
assert(r==0);
}
if (txn->parent==NULL) {
OMTVALUE v;
u_int32_t idx;
//Remove txn from list of live root txns
r = toku_omt_find_zero(txn_manager->live_root_txns, toku_find_xid_by_xid, (OMTVALUE)txn->txnid64, &v, &idx);
assert(r==0);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(txn_manager->live_root_txns, idx);
assert(r==0);
}
//
// if this txn created a snapshot, make necessary modifications to list of snapshot txnids and live_list_reverse
// the order of operations is important. We first remove the txnid from the list of snapshot txnids. This is
// necessary because root snapshot transactions are in their own live lists. If we do not remove
// the txnid from the snapshot txnid list first, then when we go to make the modifications to
// live_list_reverse, we have trouble. We end up never removing (id, id) from live_list_reverse
//
if (txn->snapshot_type != TXN_SNAPSHOT_NONE && (txn->parent==NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD)) {
{
u_int32_t idx;
OMTVALUE v;
//Free memory used for snapshot_txnids
r = toku_omt_find_zero(txn_manager->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
invariant(r==0);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(txn_manager->snapshot_txnids, idx);
invariant(r==0);
}
live_list_reverse_note_txn_end(txn);
{
//Free memory used for live root txns local list
invariant(toku_omt_size(txn->live_root_txn_list) > 0);
toku_omt_destroy(&txn->live_root_txn_list);
}
}
}
assert(txn_manager->oldest_living_xid <= txn->txnid64);
if (txn->txnid64 == txn_manager->oldest_living_xid) {
OMTVALUE oldest_txnv;
r = toku_omt_fetch(txn_manager->live_txns, 0, &oldest_txnv);
if (r==0) {
TOKUTXN oldest_txn = oldest_txnv;
assert(oldest_txn != txn); // We just removed it
assert(oldest_txn->txnid64 > txn_manager->oldest_living_xid); //Must be newer than the previous oldest
txn_manager->oldest_living_xid = oldest_txn->txnid64;
txn_manager->oldest_living_starttime = oldest_txn->starttime;
}
else {
//No living transactions
assert(r==EINVAL);
txn_manager->oldest_living_xid = TXNID_NONE_LIVING;
txn_manager->oldest_living_starttime = 0;
}
}
if (garbage_collection_debug) {
verify_snapshot_system(txn_manager);
}
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
void toku_txn_manager_clone_state_for_gc(
TXN_MANAGER txn_manager,
OMT* snapshot_xids,
OMT* live_list_reverse,
OMT* live_root_txns
)
{
int r = 0;
toku_mutex_lock(&txn_manager->txn_manager_lock);
r = toku_omt_clone_noptr(snapshot_xids,
txn_manager->snapshot_txnids);
assert_zero(r);
r = toku_omt_clone_pool(live_list_reverse,
txn_manager->live_list_reverse,
sizeof(XID_PAIR_S));
assert_zero(r);
r = toku_omt_clone_noptr(live_root_txns,
txn_manager->live_root_txns);
assert_zero(r);
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
//Heaviside function to search through an OMT by a TXNID
static int
find_by_xid (OMTVALUE v, void *txnidv) {
TOKUTXN txn = v;
TXNID txnidfind = *(TXNID*)txnidv;
if (txn->txnid64<txnidfind) return -1;
if (txn->txnid64>txnidfind) return +1;
return 0;
}
void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID txnid, TOKUTXN *result) {
OMTVALUE txnfound;
int r = toku_omt_find_zero(txn_manager->live_txns, find_by_xid, &txnid, &txnfound, NULL);
if (r==0) {
TOKUTXN txn = txnfound;
assert(txn->txnid64==txnid);
*result = txn;
}
else {
assert(r==DB_NOTFOUND);
// If there is no txn, then we treat it as the null txn.
*result = NULL;
}
}
void toku_txn_manager_id2txn(TXN_MANAGER txn_manager, TXNID txnid, TOKUTXN *result) {
toku_mutex_lock(&txn_manager->txn_manager_lock);
toku_txn_manager_id2txn_unlocked(txn_manager, txnid, result);
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
int toku_txn_manager_get_txn_from_xid (TXN_MANAGER txn_manager, TOKU_XA_XID *xid, DB_TXN **txnp) {
toku_mutex_lock(&txn_manager->txn_manager_lock);
int ret_val = 0;
int num_live_txns = toku_omt_size(txn_manager->live_txns);
for (int i = 0; i < num_live_txns; i++) {
OMTVALUE v;
{
int r = toku_omt_fetch(txn_manager->live_txns, i, &v);
assert_zero(r);
}
TOKUTXN txn = v;
if (txn->xa_xid.formatID == xid->formatID
&& txn->xa_xid.gtrid_length == xid->gtrid_length
&& txn->xa_xid.bqual_length == xid->bqual_length
&& 0==memcmp(txn->xa_xid.data, xid->data, xid->gtrid_length + xid->bqual_length)) {
*txnp = txn->container_db_txn;
ret_val = 0;
goto exit;
}
}
ret_val = DB_NOTFOUND;
exit:
toku_mutex_unlock(&txn_manager->txn_manager_lock);
return ret_val;
}
u_int32_t toku_txn_manager_num_live_txns(TXN_MANAGER txn_manager) {
int ret_val = 0;
toku_mutex_lock(&txn_manager->txn_manager_lock);
ret_val = toku_omt_size(txn_manager->live_txns);
toku_mutex_unlock(&txn_manager->txn_manager_lock);
return ret_val;
}
int toku_txn_manager_iter_over_live_txns(
TXN_MANAGER txn_manager,
int (*f)(OMTVALUE, u_int32_t, void*),
void* v
)
{
int r = 0;
toku_mutex_lock(&txn_manager->txn_manager_lock);
r = toku_omt_iterate(txn_manager->live_txns, f, v);
toku_mutex_unlock(&txn_manager->txn_manager_lock);
return r;
}
void toku_txn_manager_add_prepared_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
toku_mutex_lock(&txn_manager->txn_manager_lock);
assert(txn->state==TOKUTXN_LIVE);
txn->state = TOKUTXN_PREPARING; // This state transition must be protected against begin_checkpoint. Right now it uses the ydb lock.
toku_list_push(&txn_manager->prepared_txns, &txn->prepared_txns_link);
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
static void invalidate_xa_xid (TOKU_XA_XID *xid) {
ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind
xid->formatID = -1; // According to the XA spec, -1 means "invalid data"
}
void toku_txn_manager_note_abort_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
toku_mutex_lock(&txn_manager->txn_manager_lock);
if (txn->state==TOKUTXN_PREPARING) {
invalidate_xa_xid(&txn->xa_xid);
toku_list_remove(&txn->prepared_txns_link);
}
txn->state = TOKUTXN_ABORTING;
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
void toku_txn_manager_note_commit_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
toku_mutex_lock(&txn_manager->txn_manager_lock);
if (txn->state==TOKUTXN_PREPARING) {
invalidate_xa_xid(&txn->xa_xid);
toku_list_remove(&txn->prepared_txns_link);
}
txn->state = TOKUTXN_COMMITTING;
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
int toku_txn_manager_recover_txn (
TXN_MANAGER txn_manager,
struct tokulogger_preplist preplist[/*count*/],
long count,
long *retp, /*out*/
u_int32_t flags
)
{
int ret_val = 0;
toku_mutex_lock(&txn_manager->txn_manager_lock);
if (flags==DB_FIRST) {
// Anything in the returned list goes back on the prepared list.
while (!toku_list_empty(&txn_manager->prepared_and_returned_txns)) {
struct toku_list *h = toku_list_head(&txn_manager->prepared_and_returned_txns);
toku_list_remove(h);
toku_list_push(&txn_manager->prepared_txns, h);
}
} else if (flags!=DB_NEXT) {
ret_val = EINVAL;
goto exit;
}
long i;
for (i=0; i<count; i++) {
if (!toku_list_empty(&txn_manager->prepared_txns)) {
struct toku_list *h = toku_list_head(&txn_manager->prepared_txns);
toku_list_remove(h);
toku_list_push(&txn_manager->prepared_and_returned_txns, h);
TOKUTXN txn = toku_list_struct(h, struct tokutxn, prepared_txns_link);
assert(txn->container_db_txn);
preplist[i].txn = txn->container_db_txn;
preplist[i].xid = txn->xa_xid;
} else {
break;
}
}
*retp = i;
ret_val = 0;
exit:
toku_mutex_unlock(&txn_manager->txn_manager_lock);
return ret_val;
}
// needed for hot indexing
void toku_txn_manager_suspend(TXN_MANAGER txn_manager) {
toku_mutex_lock(&txn_manager->txn_manager_lock);
}
void toku_txn_manager_resume(TXN_MANAGER txn_manager) {
toku_mutex_unlock(&txn_manager->txn_manager_lock);
}
#undef STATUS_VALUE
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ifndef TOKUTXN_MANAGER_H
#define TOKUTXN_MANAGER_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
struct txn_manager;
void toku_txn_manager_get_status(TOKULOGGER logger, TXN_STATUS s);
void toku_txn_manager_init(TXN_MANAGER* txn_manager);
void toku_txn_manager_destroy(TXN_MANAGER txn_manager);
TXNID toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager, time_t * oldest_living_starttime);
// Assign a txnid. Log the txn begin in the recovery log. Initialize the txn live lists.
void toku_txn_manager_start_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
void toku_txn_manager_clone_state_for_gc(
TXN_MANAGER txn_manager,
OMT* snapshot_xids,
OMT* live_list_reverse,
OMT* live_root_txns
);
void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID txnid, TOKUTXN *result);
void toku_txn_manager_id2txn (TXN_MANAGER txn_manager, TXNID txnid, TOKUTXN *result);
int toku_txn_manager_get_txn_from_xid (TXN_MANAGER txn_manager, TOKU_XA_XID *xid, DB_TXN **txnp);
u_int32_t toku_txn_manager_num_live_txns(TXN_MANAGER txn_manager);
int toku_txn_manager_iter_over_live_txns(
TXN_MANAGER txn_manager,
int (*f)(OMTVALUE, u_int32_t, void*),
void* v
);
void toku_txn_manager_add_prepared_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
void toku_txn_manager_note_abort_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
void toku_txn_manager_note_commit_txn(TXN_MANAGER txn_manager, TOKUTXN txn);
int toku_txn_manager_recover_txn(
TXN_MANAGER txn_manager,
struct tokulogger_preplist preplist[/*count*/],
long count,
long *retp, /*out*/
u_int32_t flags
);
void toku_txn_manager_suspend(TXN_MANAGER txn_manager);
void toku_txn_manager_resume(TXN_MANAGER txn_manager);
#if defined(__cplusplus) || defined(__cilkplusplus)
}
#endif
#endif //TOKUTXN_H
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <ft/leafentry.h> #include <ft/leafentry.h>
#include <ft/ule.h> #include <ft/ule.h>
#include <ft/xids.h> #include <ft/xids.h>
#include "ft/txn_manager.h"
#include "ydb_row_lock.h" #include "ydb_row_lock.h"
#include "indexer-internal.h" #include "indexer-internal.h"
...@@ -181,6 +182,7 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) { ...@@ -181,6 +182,7 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
int result = 0; int result = 0;
indexer_commit_keys_set_empty(&indexer->i->commit_keys); indexer_commit_keys_set_empty(&indexer->i->commit_keys);
toku_txn_manager_suspend(toku_logger_get_txn_manager(indexer->i->env->i->logger));
// init the xids to the root xid // init the xids to the root xid
XIDS xids = xids_get_root_xids(); XIDS xids = xids_get_root_xids();
...@@ -305,7 +307,8 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) { ...@@ -305,7 +307,8 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids); result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids);
xids_destroy(&xids); xids_destroy(&xids);
toku_txn_manager_resume(toku_logger_get_txn_manager(indexer->i->env->i->logger));
return result; return result;
} }
...@@ -390,8 +393,11 @@ indexer_xid_state(DB_INDEXER *indexer, TXNID xid) { ...@@ -390,8 +393,11 @@ indexer_xid_state(DB_INDEXER *indexer, TXNID xid) {
} else { } else {
DB_ENV *env = indexer->i->env; DB_ENV *env = indexer->i->env;
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
int r = toku_txnid2txn(env->i->logger, xid, &txn); toku_txn_manager_id2txn_unlocked(
assert(r == 0); toku_logger_get_txn_manager(env->i->logger),
xid,
&txn
);
if (txn) if (txn)
result = toku_txn_get_state(txn); result = toku_txn_get_state(txn);
else else
...@@ -410,8 +416,12 @@ indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_ ...@@ -410,8 +416,12 @@ indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_
} else { } else {
DB_ENV *env = indexer->i->env; DB_ENV *env = indexer->i->env;
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
result = toku_txnid2txn(env->i->logger, outermost_live_xid, &txn); toku_txn_manager_id2txn_unlocked(
assert(result == 0 && txn != NULL); toku_logger_get_txn_manager(env->i->logger),
outermost_live_xid,
&txn
);
assert(txn != NULL);
result = toku_grab_write_lock(hotdb, key, txn); result = toku_grab_write_lock(hotdb, key, txn);
} }
return result; return result;
...@@ -444,8 +454,11 @@ indexer_get_innermost_live_txn(DB_INDEXER *indexer, XIDS xids) { ...@@ -444,8 +454,11 @@ indexer_get_innermost_live_txn(DB_INDEXER *indexer, XIDS xids) {
uint8_t num_xids = xids_get_num_xids(xids); uint8_t num_xids = xids_get_num_xids(xids);
TXNID xid = xids_get_xid(xids, (u_int8_t)(num_xids-1)); TXNID xid = xids_get_xid(xids, (u_int8_t)(num_xids-1));
TOKUTXN txn = NULL; TOKUTXN txn = NULL;
int result = toku_txnid2txn(env->i->logger, xid, &txn); toku_txn_manager_id2txn_unlocked(
assert(result == 0); toku_logger_get_txn_manager(env->i->logger),
xid,
&txn
);
return txn; return txn;
} }
......
...@@ -193,6 +193,26 @@ toku_indexer_create_indexer(DB_ENV *env, ...@@ -193,6 +193,26 @@ toku_indexer_create_indexer(DB_ENV *env,
indexer->build = build_index; indexer->build = build_index;
indexer->close = close_indexer; indexer->close = close_indexer;
indexer->abort = abort_indexer; indexer->abort = abort_indexer;
toku_ydb_unlock();
//
// create and close a dummy loader to get redirection going for the hot indexer
// This way, if the hot index aborts, but other transactions have references to the
// underlying FT, then those transactions can do dummy operations on the FT
// while the DB gets redirected back to an empty dictionary
//
for (int i = 0; i < N; i++) {
DB_LOADER* loader = NULL;
int r = env->create_loader(env, txn, &loader, dest_dbs[i], 1, &dest_dbs[i], NULL, NULL, DB_PRELOCKED_WRITE);
if (r) {
goto create_exit;
}
r = loader->close(loader);
if (r) {
goto create_exit;
}
}
toku_ydb_lock();
// create and initialize the leafentry cursor // create and initialize the leafentry cursor
rval = le_cursor_create(&indexer->i->lec, db_struct_i(src_db)->ft_handle, db_txn_struct_i(txn)->tokutxn); rval = le_cursor_create(&indexer->i->lec, db_struct_i(src_db)->ft_handle, db_txn_struct_i(txn)->tokutxn);
......
...@@ -41,6 +41,7 @@ const char *toku_copyright_string = "Copyright (c) 2007-2009 Tokutek Inc. All r ...@@ -41,6 +41,7 @@ const char *toku_copyright_string = "Copyright (c) 2007-2009 Tokutek Inc. All r
#include "ydb_db.h" #include "ydb_db.h"
#include "ydb_write.h" #include "ydb_write.h"
#include "ydb_txn.h" #include "ydb_txn.h"
#include "ft/txn_manager.h"
#ifdef TOKUTRACE #ifdef TOKUTRACE
#define DB_ENV_CREATE_FUN db_env_create_toku10 #define DB_ENV_CREATE_FUN db_env_create_toku10
...@@ -1510,7 +1511,7 @@ locked_env_txn_xa_recover (DB_ENV *env, TOKU_XA_XID xids[/*count*/], long count, ...@@ -1510,7 +1511,7 @@ locked_env_txn_xa_recover (DB_ENV *env, TOKU_XA_XID xids[/*count*/], long count,
static int static int
toku_env_get_txn_from_xid (DB_ENV *env, /*in*/ TOKU_XA_XID *xid, /*out*/ DB_TXN **txnp) { toku_env_get_txn_from_xid (DB_ENV *env, /*in*/ TOKU_XA_XID *xid, /*out*/ DB_TXN **txnp) {
return toku_logger_get_txn_from_xid(env->i->logger, xid, txnp); return toku_txn_manager_get_txn_from_xid(toku_logger_get_txn_manager(env->i->logger), xid, txnp);
} }
static int static int
...@@ -2159,7 +2160,7 @@ env_get_engine_status (DB_ENV * env, TOKU_ENGINE_STATUS_ROW engstat, uint64_t ma ...@@ -2159,7 +2160,7 @@ env_get_engine_status (DB_ENV * env, TOKU_ENGINE_STATUS_ROW engstat, uint64_t ma
} }
{ {
TXN_STATUS_S txnstat; TXN_STATUS_S txnstat;
toku_txn_get_status(env->i->logger, &txnstat); toku_txn_get_status(&txnstat);
for (int i = 0; i < TXN_STATUS_NUM_ROWS && row < maxrows; i++) { for (int i = 0; i < TXN_STATUS_NUM_ROWS && row < maxrows; i++) {
engstat[row++] = txnstat.status[i]; engstat[row++] = txnstat.status[i];
} }
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include "ydb_txn.h" #include "ydb_txn.h"
#include <lock_tree/lth.h> #include <lock_tree/lth.h>
#include <valgrind/helgrind.h> #include <valgrind/helgrind.h>
#include "ft/txn_manager.h"
static int static int
toku_txn_release_locks(DB_TXN* txn) { toku_txn_release_locks(DB_TXN* txn) {
...@@ -110,17 +111,17 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, ...@@ -110,17 +111,17 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
assert_zero(r); assert_zero(r);
// Close the logger after releasing the locks
r = toku_txn_release_locks(txn);
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
TOKULOGGER logger = txn->mgrp->i->logger; TOKULOGGER logger = txn->mgrp->i->logger;
LSN do_fsync_lsn; LSN do_fsync_lsn;
BOOL do_fsync; BOOL do_fsync;
// //
// quickie fix for 5.2.0, need to extract these variables so that // quickie fix for 5.2.0, need to extract these variables so that
// we can do the fsync after the close of txn. We need to do it // we can do the fsync after the close of txn. We need to do it
// after the close because if we do it before, there are race // after the close because if we do it before, there are race
// conditions exposed by test_stress1.c (#4145, #4153) // conditions exposed by test_stress1.c (#4145, #4153) // release locks after completing the txn
//
// TODO: (Zardosht) refine this comment
// //
// Here is what was going on. In Maxwell (5.1.X), we used to // Here is what was going on. In Maxwell (5.1.X), we used to
// call toku_txn_maybe_fsync_log in between toku_txn_release_locks // call toku_txn_maybe_fsync_log in between toku_txn_release_locks
...@@ -147,6 +148,9 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, ...@@ -147,6 +148,9 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
// this lock must be held until the references to the open FTs is released // this lock must be held until the references to the open FTs is released
// begin checkpoint logs these associations, so we must be protect // begin checkpoint logs these associations, so we must be protect
// the changing of these associations with checkpointing // the changing of these associations with checkpointing
// Close the logger after releasing the locks
r = toku_txn_release_locks(txn);
if (release_multi_operation_client_lock) { if (release_multi_operation_client_lock) {
toku_multi_operation_client_unlock(); toku_multi_operation_client_unlock();
} }
...@@ -505,7 +509,10 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool ...@@ -505,7 +509,10 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool
if (!holds_ydb_lock) { if (!holds_ydb_lock) {
toku_ydb_lock(); toku_ydb_lock();
} }
toku_txn_start_txn(db_txn_struct_i(result)->tokutxn); toku_txn_manager_start_txn(
toku_logger_get_txn_manager(env->i->logger),
db_txn_struct_i(result)->tokutxn
);
if (!holds_ydb_lock) { if (!holds_ydb_lock) {
toku_ydb_unlock(); toku_ydb_unlock();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment