Commit 603cf574 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

3590 log commit, process rollback, release locks, fsync commit log entry closes[t:3590]

git-svn-id: file:///svn/toku/tokudb@33034 c7de825b-a66e-492c-adef-691d508d4ae1
parent f1e49321
......@@ -156,6 +156,8 @@ struct tokutxn {
struct toku_list checkpoint_before_commit;
TXN_IGNORE_S ignore_errors; // 2954
TOKUTXN_STATE state;
LSN do_fsync_lsn;
BOOL do_fsync;
};
struct txninfo {
......
......@@ -167,9 +167,9 @@ toku_logger_open_rollback(TOKULOGGER logger, CACHETABLE cachetable, BOOL create)
BRT t = NULL; // Note, there is no DB associated with this BRT.
r = toku_brt_create(&t);
assert(r==0);
assert_zero(r);
r = toku_brt_open(t, ROLLBACK_CACHEFILE_NAME, create, create, cachetable, NULL_TXN, NULL);
assert(r==0);
assert_zero(r);
logger->rollback_cachefile = t->cf;
toku_brtheader_lock(t->h);
//Verify it is empty
......@@ -199,7 +199,7 @@ toku_logger_close_rollback(TOKULOGGER logger, BOOL recovery_failed) {
toku_brtheader_lock(h);
if (!h->panic && recovery_failed) {
r = toku_brt_header_set_panic(h, EINVAL, "Recovery failed");
assert(r==0);
assert_zero(r);
}
//Verify it is safe to close it.
if (!h->panic) { //If paniced, it is safe to close.
......@@ -322,7 +322,7 @@ wait_till_output_available (TOKULOGGER logger)
{
while (!logger->output_is_available) {
int r = toku_pthread_cond_wait(&logger->output_condition, &logger->output_condition_lock);
assert(r==0);
assert_zero(r);
}
}
......@@ -333,7 +333,7 @@ grab_output(TOKULOGGER logger, LSN *fsynced_lsn)
// Exit: Hold permission to modify output (but none of the locks).
{
int r;
r = toku_pthread_mutex_lock(&logger->output_condition_lock); assert(r==0);
r = toku_pthread_mutex_lock(&logger->output_condition_lock); assert_zero(r);
logger->output_condition_lock_ctr++;
wait_till_output_available(logger);
logger->output_is_available = FALSE;
......@@ -341,7 +341,7 @@ grab_output(TOKULOGGER logger, LSN *fsynced_lsn)
*fsynced_lsn = logger->fsynced_lsn;
}
logger->output_condition_lock_ctr++;
r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert(r==0);
r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert_zero(r);
}
static BOOL
......@@ -354,7 +354,7 @@ wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger,
// Exit: Hold the output permission if returns false.
{
BOOL result;
{ int r = toku_pthread_mutex_lock(&logger->output_condition_lock); logger->output_condition_lock_ctr++; assert(r==0); }
{ int r = toku_pthread_mutex_lock(&logger->output_condition_lock); logger->output_condition_lock_ctr++; assert_zero(r); }
while (1) {
if (logger->fsynced_lsn.lsn >= lsn.lsn) { // we can look at the fsynced lsn since we have the lock.
result = TRUE;
......@@ -367,10 +367,10 @@ wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger,
}
// otherwise wait for a good time to look again.
int r = toku_pthread_cond_wait(&logger->output_condition, &logger->output_condition_lock);
assert(r==0);
assert_zero(r);
}
*fsynced_lsn = logger->fsynced_lsn;
{ logger->output_condition_lock_ctr++; int r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert(r==0); }
{ logger->output_condition_lock_ctr++; int r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert_zero(r); }
return result;
}
......@@ -381,15 +381,15 @@ release_output (TOKULOGGER logger, LSN fsynced_lsn)
// Exit: Holds neither locks nor output permission.
{
int r;
r = toku_pthread_mutex_lock(&logger->output_condition_lock); assert(r==0);
r = toku_pthread_mutex_lock(&logger->output_condition_lock); assert_zero(r);
logger->output_condition_lock_ctr++;
logger->output_is_available = TRUE;
if (logger->fsynced_lsn.lsn < fsynced_lsn.lsn) {
logger->fsynced_lsn = fsynced_lsn;
}
r = toku_pthread_cond_broadcast(&logger->output_condition); assert(r==0);
r = toku_pthread_cond_broadcast(&logger->output_condition); assert_zero(r);
logger->output_condition_lock_ctr++;
r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert(r==0);
r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert_zero(r);
}
static void
......@@ -420,7 +420,7 @@ write_outbuf_to_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
// If the file got too big, then open a new file.
if (logger->n_in_file > logger->lg_max) {
int r = close_and_open_logfile(logger, fsynced_lsn);
assert(r==0);
assert_zero(r);
}
}
......@@ -482,7 +482,7 @@ int toku_logger_fsync (TOKULOGGER logger)
{
int r;
if (logger->is_panicked) return EINVAL;
r = ml_lock(&logger->input_lock); assert(r==0);
r = ml_lock(&logger->input_lock); assert_zero(r);
logger->input_lock_ctr++;
r = toku_logger_maybe_fsync(logger, logger->inbuf.max_lsn_in_buf, TRUE);
if (r!=0) {
......@@ -496,7 +496,7 @@ toku_logger_fsync_if_lsn_not_fsynced (TOKULOGGER logger, LSN lsn) {
int r = 0;
if (logger->is_panicked) r = EINVAL;
else if (logger->write_log_files) {
r = ml_lock(&logger->input_lock); assert(r==0);
r = ml_lock(&logger->input_lock); assert_zero(r);
logger->input_lock_ctr++;
r = toku_logger_maybe_fsync(logger, lsn, TRUE);
if (r!=0) {
......@@ -749,21 +749,21 @@ int toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync)
if (do_fsync) {
// reacquire the locks (acquire output permission first)
logger->input_lock_ctr++;
r = ml_unlock(&logger->input_lock); assert(r==0);
r = ml_unlock(&logger->input_lock); assert_zero(r);
LSN fsynced_lsn;
BOOL already_done = wait_till_output_already_written_or_output_buffer_available(logger, lsn, &fsynced_lsn);
if (already_done) return 0;
// otherwise we now own the output permission, and our lsn isn't outputed.
r = ml_lock(&logger->input_lock); assert(r==0);
r = ml_lock(&logger->input_lock); assert_zero(r);
logger->input_lock_ctr++;
swap_inbuf_outbuf(logger);
logger->input_lock_ctr++;
r = ml_unlock(&logger->input_lock); // release the input lock now, so other threads can fill the inbuf. (Thus enabling group commit.)
assert(r==0);
assert_zero(r);
write_outbuf_to_logfile(logger, &fsynced_lsn);
if (fsynced_lsn.lsn < lsn.lsn) {
......@@ -785,7 +785,7 @@ int toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync)
} else {
logger->input_lock_ctr++;
r = ml_unlock(&logger->input_lock);
assert(r==0);
assert_zero(r);
}
return 0;
}
......@@ -798,7 +798,7 @@ toku_logger_write_buffer (TOKULOGGER logger, LSN *fsynced_lsn)
// Note: Only called during single-threaded activity from toku_logger_restart, so locks aren't really needed.
{
swap_inbuf_outbuf(logger);
{ logger->input_lock_ctr++; int r = ml_unlock(&logger->input_lock); assert(r==0); }
{ logger->input_lock_ctr++; int r = ml_unlock(&logger->input_lock); assert_zero(r); }
write_outbuf_to_logfile(logger, fsynced_lsn);
if (logger->write_log_files) {
int r = toku_file_fsync_without_accounting(logger->fd);
......
......@@ -164,7 +164,6 @@ toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync);
// fsync
// release the outlock
typedef struct logger_status {
u_int64_t ilock_ctr;
u_int64_t olock_ctr;
......@@ -179,7 +178,7 @@ int toku_get_version_of_logs_on_disk(const char *log_dir, BOOL *found_any_logs,
int toku_delete_all_logs_of_version(const char *log_dir, uint32_t version_to_delete);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
}
#endif
#endif
......
......@@ -80,7 +80,7 @@ setup_live_root_txn_list(TOKUTXN txn) {
xidsp[i] = &xids[i];
}
r = toku_omt_iterate(global, fill_xids, xids);
invariant(r==0);
assert_zero(r);
r = toku_omt_create_steal_sorted_array(&txn->live_root_txn_list, &xidsp, num, num);
return r;
......@@ -96,7 +96,7 @@ snapshot_txnids_note_txn(TOKUTXN txn) {
TXNID *XMALLOC(xid);
*xid = txn->txnid64;
r = toku_omt_insert_at(txnids, xid, toku_omt_size(txnids));
invariant(r==0);
assert_zero(r);
return r;
}
......@@ -128,7 +128,7 @@ live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), v
pair->xid1 = *live_xid;
pair->xid2 = txn->txnid64;
r = toku_omt_insert_at(reverse, pair, idx);
invariant(r==0);
assert_zero(r);
}
return r;
}
......@@ -146,7 +146,7 @@ live_list_reverse_note_txn_start(TOKUTXN txn) {
int r;
r = toku_omt_iterate(txn->live_root_txn_list, live_list_reverse_note_txn_start_iter, txn);
invariant(r==0);
assert_zero(r);
return r;
}
......@@ -246,12 +246,12 @@ int toku_txn_begin_with_xid (
// is a child transaction that specifically asked for its own snapshot
if (parent_tokutxn==NULL || snapshot_type == TXN_SNAPSHOT_CHILD) {
r = setup_live_root_txn_list(result);
invariant(r==0);
assert_zero(r);
result->snapshot_txnid64 = result->txnid64;
r = snapshot_txnids_note_txn(result);
invariant(r==0);
assert_zero(r);
r = live_list_reverse_note_txn_start(result);
invariant(r==0);
assert_zero(r);
}
// in this case, it is a child transaction that specified its snapshot to be that
// of the root transaction
......@@ -270,6 +270,7 @@ int toku_txn_begin_with_xid (
result->recovered_from_checkpoint = FALSE;
toku_list_init(&result->checkpoint_before_commit);
result->state = TOKUTXN_LIVE;
result->do_fsync = FALSE;
// 2954
r = toku_txn_ignore_init(result);
......@@ -297,7 +298,7 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
for (i = 0; i < info->num_brts; i++) {
BRT brt = info->open_brts[i];
int r = toku_txn_note_brt(txn, brt);
assert(r==0);
assert_zero(r);
}
COPY_FROM_INFO(force_fsync_on_commit );
COPY_FROM_INFO(num_rollback_nodes);
......@@ -329,7 +330,6 @@ int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
struct xcommit_info {
int r;
TOKUTXN txn;
int do_fsync;
};
//Called during a yield (ydb lock NOT held).
......@@ -360,12 +360,12 @@ local_checkpoints_and_log_xcommit(void *thunk) {
CACHETABLE ct = toku_cachefile_get_cachetable(cachefiles[0]);
int r = toku_cachetable_local_checkpoint_for_commit(ct, txn, num_cachefiles, cachefiles);
assert(r==0);
assert_zero(r);
toku_free(cachefiles);
toku_poll_txn_progress_function(txn, TRUE, FALSE);
}
info->r = toku_log_xcommit(txn->logger, (LSN*)0, info->do_fsync, txn->txnid64); // exits holding neither of the tokulogger locks.
info->r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64); // exits holding neither of the tokulogger locks.
}
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
......@@ -377,8 +377,9 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
int r;
// panic handled in log_commit
//Child transactions do not actually 'commit'. They promote their changes to parent, so no need to fsync if this txn has a parent.
int do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->num_rollentries>0));
// Child transactions do not actually 'commit'. They promote their changes to parent, so no need to fsync if this txn has a parent.
// the do_sync state is captured in the txn for txn_close_txn later
txn->do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->num_rollentries>0));
txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra;
......@@ -387,7 +388,6 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
struct xcommit_info info = {
.r = 0,
.txn = txn,
.do_fsync = do_fsync
};
yield(local_checkpoints_and_log_xcommit, &info, yieldv);
r = info.r;
......@@ -419,7 +419,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra;
int r=0;
r = toku_log_xabort(txn->logger, (LSN*)0, 0, txn->txnid64);
r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
if (r!=0)
return r;
r = toku_rollback_abort(txn, yield, yieldv, oplsn);
......@@ -428,10 +428,18 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
}
void toku_txn_close_txn(TOKUTXN txn) {
TOKULOGGER logger = txn->logger;
toku_rollback_txn_close(txn);
if (garbage_collection_debug) {
TOKULOGGER logger = txn->logger; // capture these for the fsync after the txn is deleted
bool do_fsync = txn->do_fsync;
LSN lsn = txn->do_fsync_lsn;
toku_rollback_txn_close(txn);
txn = NULL; // txn is no longer valid
if (garbage_collection_debug)
verify_snapshot_system(logger);
if (do_fsync) { // #3590
int r = toku_logger_fsync_if_lsn_not_fsynced(logger, lsn);
assert_zero(r);
}
status.close++;
return;
......@@ -464,7 +472,7 @@ TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) {
OMTVALUE v;
int r;
r = toku_omt_fetch(omt, 0, &v, NULL);
invariant(r==0);
assert_zero(r);
TXNID *xidp = v;
return *xidp;
}
......@@ -512,19 +520,19 @@ verify_snapshot_system(TOKULOGGER logger) {
for (i = 0; i < num_snapshot_txnids; i++) {
OMTVALUE v;
r = toku_omt_fetch(logger->snapshot_txnids, i, &v, NULL);
invariant(r==0);
assert_zero(r);
snapshot_txnids[i] = *(TXNID*)v;
}
for (i = 0; i < num_live_txns; i++) {
OMTVALUE v;
r = toku_omt_fetch(logger->live_txns, i, &v, NULL);
invariant(r==0);
assert_zero(r);
live_txns[i] = v;
}
for (i = 0; i < num_live_list_reverse; i++) {
OMTVALUE v;
r = toku_omt_fetch(logger->live_list_reverse, i, &v, NULL);
invariant(r==0);
assert_zero(r);
live_list_reverse[i] = v;
}
......@@ -535,14 +543,14 @@ verify_snapshot_system(TOKULOGGER logger) {
invariant(is_txnid_live(logger, snapshot_xid));
TOKUTXN snapshot_txn;
r = toku_txnid2txn(logger, snapshot_xid, &snapshot_txn);
invariant(r==0);
assert_zero(r);
int num_live_root_txn_list = toku_omt_size(snapshot_txn->live_root_txn_list);
TXNID live_root_txn_list[num_live_root_txn_list];
{
for (j = 0; j < num_live_root_txn_list; j++) {
OMTVALUE v;
r = toku_omt_fetch(snapshot_txn->live_root_txn_list, j, &v, NULL);
invariant(r==0);
assert_zero(r);
live_root_txn_list[j] = *(TXNID*)v;
}
}
......@@ -571,7 +579,7 @@ verify_snapshot_system(TOKULOGGER logger) {
r = toku_omt_find_zero(logger->snapshot_txnids,
toku_find_xid_by_xid,
&pair->xid2, &v2, &index, NULL);
invariant(r==0);
assert_zero(r);
}
for (j = 0; j < num_live_txns; j++) {
TOKUTXN txn = live_txns[j];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment