Commit 6ad86d13 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#4455 add unlocked txn destructor refs[t:4455]

git-svn-id: file:///svn/toku/tokudb@40407 c7de825b-a66e-492c-adef-691d508d4ae1
parent c51b0dbf
......@@ -249,16 +249,17 @@ void toku_rollback_txn_close (TOKUTXN txn) {
assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
assert(txn->current_rollback.b == ROLLBACK_NONE.b);
int r;
r = toku_pthread_mutex_lock(&txn->logger->txn_list_lock); assert_zero(r);
TOKULOGGER logger = txn->logger;
r = toku_pthread_mutex_lock(&logger->txn_list_lock); assert_zero(r);
{
{
//Remove txn from list (omt) of live transactions
OMTVALUE txnagain;
u_int32_t idx;
r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx);
r = toku_omt_find_zero(logger->live_txns, find_xid, txn, &txnagain, &idx);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(txn->logger->live_txns, idx);
r = toku_omt_delete_at(logger->live_txns, idx);
assert(r==0);
}
......@@ -266,10 +267,10 @@ void toku_rollback_txn_close (TOKUTXN txn) {
OMTVALUE txnagain;
u_int32_t idx;
//Remove txn from list of live root txns
r = toku_omt_find_zero(txn->logger->live_root_txns, find_xid, txn, &txnagain, &idx);
r = toku_omt_find_zero(logger->live_root_txns, find_xid, txn, &txnagain, &idx);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(txn->logger->live_root_txns, idx);
r = toku_omt_delete_at(logger->live_root_txns, idx);
assert(r==0);
}
//
......@@ -284,11 +285,11 @@ void toku_rollback_txn_close (TOKUTXN txn) {
u_int32_t idx;
OMTVALUE v;
//Free memory used for snapshot_txnids
r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
r = toku_omt_find_zero(logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
invariant(r==0);
TXNID xid = (TXNID) v;
invariant(xid == txn->txnid64);
r = toku_omt_delete_at(txn->logger->snapshot_txnids, idx);
r = toku_omt_delete_at(logger->snapshot_txnids, idx);
invariant(r==0);
}
live_list_reverse_note_txn_end(txn);
......@@ -304,34 +305,28 @@ void toku_rollback_txn_close (TOKUTXN txn) {
}
}
}
r = toku_pthread_mutex_unlock(&txn->logger->txn_list_lock); assert_zero(r);
assert(txn->logger->oldest_living_xid <= txn->txnid64);
if (txn->txnid64 == txn->logger->oldest_living_xid) {
TOKULOGGER logger = txn->logger;
r = toku_pthread_mutex_unlock(&logger->txn_list_lock); assert_zero(r);
assert(logger->oldest_living_xid <= txn->txnid64);
if (txn->txnid64 == logger->oldest_living_xid) {
OMTVALUE oldest_txnv;
r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv);
if (r==0) {
TOKUTXN oldest_txn = oldest_txnv;
assert(oldest_txn != txn); // We just removed it
assert(oldest_txn->txnid64 > txn->logger->oldest_living_xid); //Must be newer than the previous oldest
txn->logger->oldest_living_xid = oldest_txn->txnid64;
txn->logger->oldest_living_starttime = oldest_txn->starttime;
assert(oldest_txn->txnid64 > logger->oldest_living_xid); //Must be newer than the previous oldest
logger->oldest_living_xid = oldest_txn->txnid64;
logger->oldest_living_starttime = oldest_txn->starttime;
}
else {
//No living transactions
assert(r==EINVAL);
txn->logger->oldest_living_xid = TXNID_NONE_LIVING;
txn->logger->oldest_living_starttime = 0;
logger->oldest_living_xid = TXNID_NONE_LIVING;
logger->oldest_living_starttime = 0;
}
}
note_txn_closing(txn);
xids_destroy(&txn->xids);
toku_txn_ignore_free(txn); // 2954
toku_free(txn);
return;
}
void* toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
......@@ -816,7 +811,6 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brts, remove_txn, txn);
toku_omt_destroy(&txn->open_brts);
}
// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
......
......@@ -488,19 +488,26 @@ void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn) {
*do_fsync_lsn = ttxn->do_fsync_lsn;
}
void toku_txn_close_txn(TOKUTXN txn) {
TOKULOGGER logger = txn->logger;
toku_txn_rollback_txn(txn);
toku_txn_destroy_txn(txn);
}
void toku_txn_rollback_txn(TOKUTXN txn) {
toku_rollback_txn_close(txn);
txn = NULL; // txn is no longer valid
}
void toku_txn_destroy_txn(TOKUTXN txn) {
if (garbage_collection_debug)
verify_snapshot_system(logger);
verify_snapshot_system(txn->logger);
toku_omt_destroy(&txn->open_brts);
xids_destroy(&txn->xids);
toku_txn_ignore_free(txn); // 2954
toku_free(txn);
STATUS_VALUE(TXN_CLOSE)++;
STATUS_VALUE(TXN_NUM_OPEN)--;
return;
}
XIDS toku_txn_get_xids (TOKUTXN txn) {
......@@ -777,4 +784,12 @@ toku_txn_get_state(TOKUTXN txn) {
return txn->state;
}
#include <valgrind/drd.h>
void __attribute__((__constructor__)) toku_txn_drd_ignore(void);
void
toku_txn_drd_ignore(void) {
DRD_IGNORE_VAR(txn_status);
}
#undef STATUS_VALUE
......@@ -49,8 +49,16 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv);
void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn);
// Rollback and destroy a txn
void toku_txn_close_txn(TOKUTXN txn);
// Remove the txn from any live txn lists
void toku_txn_rollback_txn(TOKUTXN txn);
// Free the memory of a txn
void toku_txn_destroy_txn(TOKUTXN txn);
XIDS toku_txn_get_xids (TOKUTXN);
// Returns TRUE if a is older than b
......
......@@ -9,18 +9,6 @@
#include "log_header.h"
#include "ydb_txn.h"
// add a txn to the list of open txn's
static void
env_add_open_txn(DB_ENV *env, DB_TXN *txn UU()) {
(void) __sync_fetch_and_add(&env->i->open_txns, 1);
}
// remove a txn from the list of open txn's
static void
env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn UU()) {
(void) __sync_fetch_and_sub(&env->i->open_txns, 1);
}
static int
toku_txn_release_locks(DB_TXN* txn) {
assert(txn);
......@@ -58,8 +46,18 @@ ydb_yield (voidfp f, void *fv, void *UU(v)) {
toku_ydb_lock();
}
int
toku_txn_commit(DB_TXN * txn, u_int32_t flags,
static void
toku_txn_destroy(DB_TXN *txn) {
(void) __sync_fetch_and_sub(&txn->mgrp->i->open_txns, 1);
toku_txn_destroy_txn(db_txn_struct_i(txn)->tokutxn);
#if !TOKUDB_NATIVE_H
toku_free(db_txn_struct_i(txn));
#endif
toku_free(txn);
}
static int
toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
bool release_multi_operation_client_lock) {
if (!txn) return EINVAL;
......@@ -80,7 +78,7 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags,
assert(db_txn_struct_i(txn->parent)->child == txn);
db_txn_struct_i(txn->parent)->child=NULL;
}
env_remove_open_txn(txn->mgrp, txn);
//toku_ydb_notef("flags=%d\n", flags);
if (flags & DB_TXN_SYNC) {
toku_txn_force_fsync_on_commit(db_txn_struct_i(txn)->tokutxn);
......@@ -90,18 +88,17 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags,
flags &= ~DB_TXN_NOSYNC;
int r;
if (flags!=0)
if (flags!=0) {
// frees the tokutxn
// Calls ydb_yield(NULL) occasionally
//r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra,
release_multi_operation_client_lock);
else
} else {
// frees the tokutxn
// Calls ydb_yield(NULL) occasionally
//r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL);
r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL,
poll, poll_extra, release_multi_operation_client_lock);
}
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r, "Error during commit.\n");
......@@ -142,11 +139,9 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags,
// in the test_stress tests.
//
toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn);
toku_txn_close_txn(ttxn);
toku_txn_rollback_txn(ttxn);
toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync, ydb_yield, NULL);
// the toxutxn is freed, and we must free the rest. */
//Promote list to parent (dbs that must close before abort)
if (txn->parent) {
//Combine lists.
......@@ -162,15 +157,19 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags,
}
}
// The txn is no good after the commit even if the commit fails, so free it up.
#if !TOKUDB_NATIVE_H
toku_free(db_txn_struct_i(txn));
#endif
toku_free(txn); txn = NULL;
if (flags!=0) return EINVAL;
return r;
}
int
toku_txn_commit(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
bool release_multi_operation_client_lock) {
int r = toku_txn_commit_only(txn, flags, poll, poll_extra, release_multi_operation_client_lock);
toku_txn_destroy(txn);
return r;
}
static u_int32_t
toku_txn_id(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp);
......@@ -179,8 +178,8 @@ toku_txn_id(DB_TXN * txn) {
return -1;
}
int
toku_txn_abort(DB_TXN * txn,
static int
toku_txn_abort_only(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
bool release_multi_operation_client_lock) {
HANDLE_PANICKED_ENV(txn->mgrp);
......@@ -200,12 +199,10 @@ toku_txn_abort(DB_TXN * txn,
assert(db_txn_struct_i(txn->parent)->child == txn);
db_txn_struct_i(txn->parent)->child=NULL;
}
env_remove_open_txn(txn->mgrp, txn);
//All dbs that must close before abort, must now be closed
assert(toku_list_empty(&db_txn_struct_i(txn)->dbs_that_must_close_before_abort));
//int r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
int r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra, release_multi_operation_client_lock);
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r, "Error during abort.\n");
......@@ -213,13 +210,16 @@ toku_txn_abort(DB_TXN * txn,
HANDLE_PANICKED_ENV(txn->mgrp);
assert_zero(r);
r = toku_txn_release_locks(txn);
//toku_logger_txn_close(db_txn_struct_i(txn)->tokutxn);
toku_txn_close_txn(db_txn_struct_i(txn)->tokutxn);
toku_txn_rollback_txn(db_txn_struct_i(txn)->tokutxn);
return r;
}
#if !TOKUDB_NATIVE_H
toku_free(db_txn_struct_i(txn));
#endif
toku_free(txn);
int
toku_txn_abort(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
bool release_multi_operation_client_lock) {
int r = toku_txn_abort_only(txn, poll, poll_extra, release_multi_operation_client_lock);
toku_txn_destroy(txn);
return r;
}
......@@ -233,7 +233,10 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
static u_int32_t
locked_txn_id(DB_TXN *txn) {
toku_ydb_lock(); u_int32_t r = toku_txn_id(txn); toku_ydb_unlock(); return r;
toku_ydb_lock();
u_int32_t r = toku_txn_id(txn);
toku_ydb_unlock();
return r;
}
static int
......@@ -244,7 +247,10 @@ toku_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
static int
locked_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
toku_ydb_lock(); int r = toku_txn_stat(txn, txn_stat); toku_ydb_unlock(); return r;
toku_ydb_lock();
int r = toku_txn_stat(txn, txn_stat);
toku_ydb_unlock();
return r;
}
static int
......@@ -270,8 +276,9 @@ locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags,
}
toku_multi_operation_client_lock(); //Cannot checkpoint during a commit.
toku_ydb_lock();
r = toku_txn_commit(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
r = toku_txn_commit_only(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
toku_ydb_unlock();
toku_txn_destroy(txn);
return r;
}
......@@ -280,22 +287,21 @@ locked_txn_abort_with_progress(DB_TXN *txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
toku_multi_operation_client_lock(); //Cannot checkpoint during an abort.
toku_ydb_lock();
int r = toku_txn_abort(txn, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lokc
int r = toku_txn_abort_only(txn, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
toku_ydb_unlock();
toku_txn_destroy(txn);
return r;
}
int
locked_txn_commit(DB_TXN *txn, u_int32_t flags) {
int r;
r = locked_txn_commit_with_progress(txn, flags, NULL, NULL);
int r = locked_txn_commit_with_progress(txn, flags, NULL, NULL);
return r;
}
int
locked_txn_abort(DB_TXN *txn) {
int r;
r = locked_txn_abort_with_progress(txn, NULL, NULL);
int r = locked_txn_abort_with_progress(txn, NULL, NULL);
return r;
}
......@@ -449,7 +455,9 @@ toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t fla
assert(!db_txn_struct_i(result->parent)->child);
db_txn_struct_i(result->parent)->child = result;
}
env_add_open_txn(env, result);
(void) __sync_fetch_and_add(&env->i->open_txns, 1);
*txn = result;
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment