Commit 7324fae0 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

[t:4346] Merge 4346 to main. {{{svn merge -r 38321:38382 ../tokudb.4346}}}. Refs #4346.

git-svn-id: file:///svn/toku/tokudb@38383 c7de825b-a66e-492c-adef-691d508d4ae1
parent fab5495e
...@@ -586,7 +586,8 @@ static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) { ...@@ -586,7 +586,8 @@ static int toku_recover_xcommit (struct logtype_xcommit *l, RECOVER_ENV renv) {
// commit the transaction // commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn, r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn,
NULL, NULL); NULL, NULL,
false);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
...@@ -610,7 +611,7 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) { ...@@ -610,7 +611,7 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
assert(txn!=NULL); assert(txn!=NULL);
// abort the transaction // abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn, NULL, NULL); r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn, NULL, NULL, false);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
...@@ -1148,7 +1149,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) { ...@@ -1148,7 +1149,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) {
TOKUTXN txn = (TOKUTXN) v; TOKUTXN txn = (TOKUTXN) v;
// abort the transaction // abort the transaction
r = toku_txn_abort_txn(txn, recover_yield, NULL, NULL, NULL); r = toku_txn_abort_txn(txn, recover_yield, NULL, NULL, NULL, false);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
......
...@@ -41,7 +41,7 @@ static void test_it (int N) { ...@@ -41,7 +41,7 @@ static void test_it (int N) {
r = toku_open_brt(FILENAME, 1, &brt, 1024, 256, ct, txn, toku_builtin_compare_fun, NULL); CKERR(r); r = toku_open_brt(FILENAME, 1, &brt, 1024, 256, ct, txn, toku_builtin_compare_fun, NULL); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL, false); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL); CKERR(r); r = toku_checkpoint(ct, logger, NULL, NULL, NULL, NULL); CKERR(r);
...@@ -51,7 +51,7 @@ static void test_it (int N) { ...@@ -51,7 +51,7 @@ static void test_it (int N) {
for (int i=0; i<N; i++) { for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_brt(FILENAME, 0, &brt, 1024, 256, ct, txn, toku_builtin_compare_fun, NULL); CKERR(r); r = toku_open_brt(FILENAME, 0, &brt, 1024, 256, ct, txn, toku_builtin_compare_fun, NULL); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL, false); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
...@@ -62,7 +62,7 @@ static void test_it (int N) { ...@@ -62,7 +62,7 @@ static void test_it (int N) {
memset(val, 'v', sizeof(val)); memset(val, 'v', sizeof(val));
val[sizeof(val)-1]=0; val[sizeof(val)-1]=0;
r = toku_brt_insert(brt, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), txn); r = toku_brt_insert(brt, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), txn);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL, false); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -74,7 +74,7 @@ static void test_it (int N) { ...@@ -74,7 +74,7 @@ static void test_it (int N) {
for (int i=0; i<N; i++) { for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_brt(FILENAME, 0, &brt, 1024, 256, ct, txn, toku_builtin_compare_fun, NULL); CKERR(r); r = toku_open_brt(FILENAME, 0, &brt, 1024, 256, ct, txn, toku_builtin_compare_fun, NULL); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL, false); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
...@@ -89,7 +89,7 @@ static void test_it (int N) { ...@@ -89,7 +89,7 @@ static void test_it (int N) {
assert(!is_empty); assert(!is_empty);
} }
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL, false); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -100,7 +100,7 @@ static void test_it (int N) { ...@@ -100,7 +100,7 @@ static void test_it (int N) {
} }
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r); r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_open_brt(FILENAME, 0, &brt, 1024, 256, ct, txn, toku_builtin_compare_fun, NULL); CKERR(r); r = toku_open_brt(FILENAME, 0, &brt, 1024, 256, ct, txn, toku_builtin_compare_fun, NULL); CKERR(r);
r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL); CKERR(r); r = toku_txn_commit_txn(txn, FALSE, do_yield, NULL, NULL, NULL, false); CKERR(r);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
if (0) { if (0) {
......
...@@ -48,7 +48,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -48,7 +48,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
error = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, txn, test_brt_cursor_keycompare, null_db); error = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, txn, test_brt_cursor_keycompare, null_db);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL, false);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -68,7 +68,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -68,7 +68,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0); assert(error == 0);
} }
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL, false);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -114,7 +114,7 @@ test_provdel(const char *logdir, const char *fname, int n) { ...@@ -114,7 +114,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
error = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, txn, test_brt_cursor_keycompare, null_db); error = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, txn, test_brt_cursor_keycompare, null_db);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL, false);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -165,11 +165,11 @@ test_provdel(const char *logdir, const char *fname, int n) { ...@@ -165,11 +165,11 @@ test_provdel(const char *logdir, const char *fname, int n) {
error = le_cursor_close(cursor); error = le_cursor_close(cursor);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(cursortxn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(cursortxn, TRUE, txn_yield, NULL, NULL, NULL, false);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(cursortxn); toku_txn_close_txn(cursortxn);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL, false);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
......
...@@ -52,7 +52,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -52,7 +52,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
error = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, txn, test_keycompare, null_db); error = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, txn, test_keycompare, null_db);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL, false);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -72,7 +72,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -72,7 +72,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0); assert(error == 0);
} }
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL, false);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
......
...@@ -48,7 +48,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -48,7 +48,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
error = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, txn, test_brt_cursor_keycompare, null_db); error = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, txn, test_brt_cursor_keycompare, null_db);
assert(error == 0); assert(error == 0);
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL, false);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
...@@ -68,7 +68,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) { ...@@ -68,7 +68,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0); assert(error == 0);
} }
error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL); error = toku_txn_commit_txn(txn, TRUE, txn_yield, NULL, NULL, NULL, false);
assert(error == 0); assert(error == 0);
toku_txn_close_txn(txn); toku_txn_close_txn(txn);
......
...@@ -320,11 +320,15 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) { ...@@ -320,11 +320,15 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
return 0; return 0;
} }
// Doesn't close the txn, just performs the commit operations.
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock)
// Effect: Doesn't close the txn, just performs the commit operations.
// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN, return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN,
poll, poll_extra); poll, poll_extra,
release_multi_operation_client_lock);
} }
struct xcommit_info { struct xcommit_info {
...@@ -351,7 +355,10 @@ log_xcommit(void *thunk) { ...@@ -351,7 +355,10 @@ log_xcommit(void *thunk) {
} }
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock)
// Effect: Among other things: if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{
txn->state = TOKUTXN_COMMITTING; txn->state = TOKUTXN_COMMITTING;
if (garbage_collection_debug) { if (garbage_collection_debug) {
verify_snapshot_system(txn->logger); verify_snapshot_system(txn->logger);
...@@ -374,21 +381,29 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv ...@@ -374,21 +381,29 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
yield(log_xcommit, &info, yieldv); yield(log_xcommit, &info, yieldv);
r = info.r; r = info.r;
} }
if (r!=0) if (r==0) {
return r; r = toku_rollback_commit(txn, yield, yieldv, oplsn);
r = toku_rollback_commit(txn, yield, yieldv, oplsn); status.commit++;
status.commit++; }
// Make sure we release that lock (even if there was an error)
if (release_multi_operation_client_lock) toku_multi_operation_client_unlock();
return r; return r;
} }
// Doesn't close the txn, just performs the abort operations.
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv, int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
return toku_txn_abort_with_lsn(txn, yield, yieldv, ZERO_LSN, poll, poll_extra); bool release_multi_operation_client_lock)
// Effect: Doesn't close the txn, just performs the abort operations.
// If release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{
return toku_txn_abort_with_lsn(txn, yield, yieldv, ZERO_LSN, poll, poll_extra, release_multi_operation_client_lock);
} }
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock)
// Effect: Ammong other things, if release_multi_operation_client_lock is true, then unlock that lock (even if an error path is taken)
{
txn->state = TOKUTXN_ABORTING; txn->state = TOKUTXN_ABORTING;
if (garbage_collection_debug) { if (garbage_collection_debug) {
verify_snapshot_system(txn->logger); verify_snapshot_system(txn->logger);
...@@ -403,10 +418,12 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, ...@@ -403,10 +418,12 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
int r = 0; int r = 0;
txn->do_fsync = FALSE; txn->do_fsync = FALSE;
r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
if (r!=0) if (r==0) {
return r; r = toku_rollback_abort(txn, yield, yieldv, oplsn);
r = toku_rollback_abort(txn, yield, yieldv, oplsn); status.abort++;
status.abort++; }
// Make sure we multi_operation_client_unlock release will happen even if there is an error
if (release_multi_operation_client_lock) toku_multi_operation_client_unlock();
return r; return r;
} }
......
...@@ -31,15 +31,19 @@ int toku_txn_begin_with_xid ( ...@@ -31,15 +31,19 @@ int toku_txn_begin_with_xid (
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info); int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock);
BOOL toku_txn_requires_checkpoint(TOKUTXN txn); BOOL toku_txn_requires_checkpoint(TOKUTXN txn);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv, int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock);
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock);
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv); int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv);
......
...@@ -267,6 +267,7 @@ BDB_DONTRUN_TESTS = \ ...@@ -267,6 +267,7 @@ BDB_DONTRUN_TESTS = \
test_dupsort_set_range_reverse \ test_dupsort_set_range_reverse \
test_large_update_broadcast_small_cachetable \ test_large_update_broadcast_small_cachetable \
test_logflush \ test_logflush \
test_multiple_checkpoints_block_commit \
test_query \ test_query \
test_multiple_checkpoints_block_commit \ test_multiple_checkpoints_block_commit \
test_redirect_func \ test_redirect_func \
......
...@@ -36,7 +36,7 @@ static void *start_txns (void *e) { ...@@ -36,7 +36,7 @@ static void *start_txns (void *e) {
CHK(env->txn_begin(env, NULL, &txn, 0)); CHK(env->txn_begin(env, NULL, &txn, 0));
CHK(db->put(db, txn, &k, &k, 0)); CHK(db->put(db, txn, &k, &k, 0));
CHK(txn->commit(txn, 0)); CHK(txn->commit(txn, 0));
if (j==10) __sync_fetch_and_add(&reader_start_count, 1); if (j==10) (void)__sync_fetch_and_add(&reader_start_count, 1);
if (j%1000==999) { printf("."); fflush(stdout); } if (j%1000==999) { printf("."); fflush(stdout); }
assert(j<1000); // Get upset if we manage to run this many transactions without the checkpoint thread assert(j<1000); // Get upset if we manage to run this many transactions without the checkpoint thread
} }
...@@ -50,7 +50,7 @@ static void start_checkpoints (void) { ...@@ -50,7 +50,7 @@ static void start_checkpoints (void) {
CHK(env->txn_checkpoint(env, 0, 0, 0)); CHK(env->txn_checkpoint(env, 0, 0, 0));
if (verbose) printf("ck\n"); if (verbose) printf("ck\n");
sched_yield(); sched_yield();
__sync_fetch_and_add(&writer_done_count, 1); (void)__sync_fetch_and_add(&writer_done_count, 1);
} }
} }
......
...@@ -237,7 +237,8 @@ env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn) { ...@@ -237,7 +237,8 @@ env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn) {
toku_list_remove((struct toku_list *) (void *) &txn->open_txns); toku_list_remove((struct toku_list *) (void *) &txn->open_txns);
} }
static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION, void*); static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION, void*,
bool release_multi_operation_client_lock);
static void static void
env_fs_report_in_yellow(DB_ENV *UU(env)) { env_fs_report_in_yellow(DB_ENV *UU(env)) {
...@@ -474,7 +475,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db ...@@ -474,7 +475,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags); static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags);
static int toku_db_close(DB * db, u_int32_t flags); static int toku_db_close(DB * db, u_int32_t flags);
static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal); static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal);
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION, void*); static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock);
static int db_open_iname(DB * db, DB_TXN * txn, const char *iname, u_int32_t flags, int mode); static int db_open_iname(DB * db, DB_TXN * txn, const char *iname, u_int32_t flags, int mode);
static void finalize_file_removal(DICTIONARY_ID dict_id, void * extra); static void finalize_file_removal(DICTIONARY_ID dict_id, void * extra);
...@@ -1000,7 +1001,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { ...@@ -1000,7 +1001,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
assert(r==0); assert(r==0);
} }
if (using_txns) { if (using_txns) {
r = toku_txn_commit(txn, 0, NULL, NULL); r = toku_txn_commit(txn, 0, NULL, NULL, false);
assert(r==0); assert(r==0);
} }
toku_ydb_unlock(); toku_ydb_unlock();
...@@ -2702,13 +2703,14 @@ ydb_yield (voidfp f, void *fv, void *UU(v)) { ...@@ -2702,13 +2703,14 @@ ydb_yield (voidfp f, void *fv, void *UU(v)) {
static int static int
toku_txn_commit(DB_TXN * txn, u_int32_t flags, toku_txn_commit(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
bool release_multi_operation_client_lock) {
if (!txn) return EINVAL; if (!txn) return EINVAL;
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//Recursively kill off children //Recursively kill off children
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, flags, NULL, NULL); int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, flags, NULL, NULL, false);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n"); env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n");
} }
...@@ -2735,13 +2737,14 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags, ...@@ -2735,13 +2737,14 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags,
// frees the tokutxn // frees the tokutxn
// Calls ydb_yield(NULL) occasionally // Calls ydb_yield(NULL) occasionally
//r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL); //r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra); r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra,
release_multi_operation_client_lock);
else else
// frees the tokutxn // frees the tokutxn
// Calls ydb_yield(NULL) occasionally // Calls ydb_yield(NULL) occasionally
//r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL); //r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL);
r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL, r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL,
poll, poll_extra); poll, poll_extra, release_multi_operation_client_lock);
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) { if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r, "Error during commit.\n"); env_panic(txn->mgrp, r, "Error during commit.\n");
...@@ -2821,12 +2824,13 @@ toku_txn_id(DB_TXN * txn) { ...@@ -2821,12 +2824,13 @@ toku_txn_id(DB_TXN * txn) {
static int static int
toku_txn_abort(DB_TXN * txn, toku_txn_abort(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
bool release_multi_operation_client_lock) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//Recursively kill off children (abort or commit are both correct, commit is cheaper) //Recursively kill off children (abort or commit are both correct, commit is cheaper)
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, DB_TXN_NOSYNC, NULL, NULL); int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, DB_TXN_NOSYNC, NULL, NULL, false);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent abort.\n"); env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent abort.\n");
} }
...@@ -2845,7 +2849,7 @@ toku_txn_abort(DB_TXN * txn, ...@@ -2845,7 +2849,7 @@ toku_txn_abort(DB_TXN * txn,
assert(toku_list_empty(&db_txn_struct_i(txn)->dbs_that_must_close_before_abort)); assert(toku_list_empty(&db_txn_struct_i(txn)->dbs_that_must_close_before_abort));
//int r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL); //int r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
int r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra); int r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra, release_multi_operation_client_lock);
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) { if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
env_panic(txn->mgrp, r, "Error during abort.\n"); env_panic(txn->mgrp, r, "Error during abort.\n");
} }
...@@ -2905,8 +2909,9 @@ locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags, ...@@ -2905,8 +2909,9 @@ locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags,
toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL); toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL);
} }
toku_multi_operation_client_lock(); //Cannot checkpoint during a commit. toku_multi_operation_client_lock(); //Cannot checkpoint during a commit.
toku_ydb_lock(); r = toku_txn_commit(txn, flags, poll, poll_extra); toku_ydb_unlock(); toku_ydb_lock();
toku_multi_operation_client_unlock(); //Cannot checkpoint during a commit. r = toku_txn_commit(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
toku_ydb_unlock();
return r; return r;
} }
...@@ -2914,8 +2919,9 @@ static int ...@@ -2914,8 +2919,9 @@ static int
locked_txn_abort_with_progress(DB_TXN *txn, locked_txn_abort_with_progress(DB_TXN *txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
toku_multi_operation_client_lock(); //Cannot checkpoint during an abort. toku_multi_operation_client_lock(); //Cannot checkpoint during an abort.
toku_ydb_lock(); int r = toku_txn_abort(txn, poll, poll_extra); toku_ydb_unlock(); toku_ydb_lock();
toku_multi_operation_client_unlock(); //Cannot checkpoint during an abort. int r = toku_txn_abort(txn, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lokc
toku_ydb_unlock();
return r; return r;
} }
...@@ -4868,11 +4874,11 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP ...@@ -4868,11 +4874,11 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
if (using_txns) { if (using_txns) {
// close txn // close txn
if (r == 0) { // commit if (r == 0) { // commit
r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL); r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL, false);
invariant(r==0); // TODO panic invariant(r==0); // TODO panic
} }
else { // abort else { // abort
int r2 = toku_txn_abort(child, NULL, NULL); int r2 = toku_txn_abort(child, NULL, NULL, false);
invariant(r2==0); // TODO panic invariant(r2==0); // TODO panic
} }
} }
...@@ -5571,11 +5577,11 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna ...@@ -5571,11 +5577,11 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
if (using_txns) { if (using_txns) {
// close txn // close txn
if (r == 0) { // commit if (r == 0) { // commit
r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL); r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL, false);
invariant(r==0); // TODO panic invariant(r==0); // TODO panic
} }
else { // abort else { // abort
int r2 = toku_txn_abort(child, NULL, NULL); int r2 = toku_txn_abort(child, NULL, NULL, false);
invariant(r2==0); // TODO panic invariant(r2==0); // TODO panic
} }
} }
...@@ -5692,11 +5698,11 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -5692,11 +5698,11 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
if (using_txns) { if (using_txns) {
// close txn // close txn
if (r == 0) { // commit if (r == 0) { // commit
r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL); r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL, false);
invariant(r==0); // TODO panic invariant(r==0); // TODO panic
} }
else { // abort else { // abort
int r2 = toku_txn_abort(child, NULL, NULL); int r2 = toku_txn_abort(child, NULL, NULL, false);
invariant(r2==0); // TODO panic invariant(r2==0); // TODO panic
} }
} }
...@@ -5961,8 +5967,8 @@ toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed, BOOL force_auto_c ...@@ -5961,8 +5967,8 @@ toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed, BOOL force_auto_c
static inline int static inline int
toku_db_destruct_autotxn(DB_TXN *txn, int r, BOOL changed) { toku_db_destruct_autotxn(DB_TXN *txn, int r, BOOL changed) {
if (!changed) return r; if (!changed) return r;
if (r==0) return toku_txn_commit(txn, 0, NULL, NULL); if (r==0) return toku_txn_commit(txn, 0, NULL, NULL, false);
toku_txn_abort(txn, NULL, NULL); toku_txn_abort(txn, NULL, NULL, false);
return r; return r;
} }
...@@ -6767,11 +6773,11 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname ...@@ -6767,11 +6773,11 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
if (using_txns) { if (using_txns) {
// close txn // close txn
if (rval == 0) { // all well so far, commit child if (rval == 0) { // all well so far, commit child
rval = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL); rval = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL, false);
assert(rval==0); assert(rval==0);
} }
else { // abort child else { // abort child
int r2 = toku_txn_abort(child, NULL, NULL); int r2 = toku_txn_abort(child, NULL, NULL, false);
assert(r2==0); assert(r2==0);
for (i=0; i<N; i++) { for (i=0; i<N; i++) {
if (new_inames_in_env[i]) { if (new_inames_in_env[i]) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment