Commit caa80090 authored by Dave Chinner's avatar Dave Chinner Committed by Darrick J. Wong

xfs: attach iclog callbacks in xlog_cil_set_ctx_write_state()

Now that we have a mechanism to guarantee that the callbacks
attached to an iclog are owned by the context that attaches them
until they drop their reference to the iclog via
xlog_state_release_iclog(), we can attach callbacks to the iclog at
any time we have an active reference to the iclog.

xlog_state_get_iclog_space() always guarantees that the commit
record will fit in the iclog it returns, so we can move this IO
callback setting to xlog_cil_set_ctx_write_state(), record the
commit iclog in the context and remove the need for the commit iclog
to be returned by xlog_write() altogether.

This, in turn, allows us to move the wakeup for ordered commit
record writes up into xlog_cil_set_ctx_write_state(), too, because
we have been guaranteed that this commit record will be physically
located in the iclog before any waiting commit record at a higher
sequence number will be granted iclog space.

This further cleans up the post commit record write processing in
the CIL push code, especially as xlog_state_release_iclog() will now
clean up the context when shutdown errors occur.
Signed-off-by: default avatarDave Chinner <dchinner@redhat.com>
Reviewed-by: default avatarChristoph Hellwig <hch@lst.de>
Reviewed-by: default avatarDarrick J. Wong <djwong@kernel.org>
Signed-off-by: default avatarDarrick J. Wong <djwong@kernel.org>
parent bf034bc8
...@@ -933,7 +933,7 @@ xlog_write_unmount_record( ...@@ -933,7 +933,7 @@ xlog_write_unmount_record(
/* account for space used by record data */ /* account for space used by record data */
ticket->t_curr_res -= sizeof(ulf); ticket->t_curr_res -= sizeof(ulf);
return xlog_write(log, NULL, &vec, ticket, NULL, XLOG_UNMOUNT_TRANS); return xlog_write(log, NULL, &vec, ticket, XLOG_UNMOUNT_TRANS);
} }
/* /*
...@@ -2380,8 +2380,7 @@ xlog_write_copy_finish( ...@@ -2380,8 +2380,7 @@ xlog_write_copy_finish(
int *data_cnt, int *data_cnt,
int *partial_copy, int *partial_copy,
int *partial_copy_len, int *partial_copy_len,
int log_offset, int log_offset)
struct xlog_in_core **commit_iclog)
{ {
int error; int error;
...@@ -2400,27 +2399,20 @@ xlog_write_copy_finish( ...@@ -2400,27 +2399,20 @@ xlog_write_copy_finish(
*partial_copy = 0; *partial_copy = 0;
*partial_copy_len = 0; *partial_copy_len = 0;
if (iclog->ic_size - log_offset <= sizeof(xlog_op_header_t)) { if (iclog->ic_size - log_offset > sizeof(xlog_op_header_t))
/* no more space in this iclog - push it. */ return 0;
spin_lock(&log->l_icloglock);
xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt);
*record_cnt = 0;
*data_cnt = 0;
if (iclog->ic_state == XLOG_STATE_ACTIVE)
xlog_state_switch_iclogs(log, iclog, 0);
else
ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC ||
xlog_is_shutdown(log));
if (!commit_iclog)
goto release_iclog;
spin_unlock(&log->l_icloglock);
ASSERT(flags & XLOG_COMMIT_TRANS);
*commit_iclog = iclog;
}
return 0; /* no more space in this iclog - push it. */
spin_lock(&log->l_icloglock);
xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt);
*record_cnt = 0;
*data_cnt = 0;
if (iclog->ic_state == XLOG_STATE_ACTIVE)
xlog_state_switch_iclogs(log, iclog, 0);
else
ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC ||
xlog_is_shutdown(log));
release_iclog: release_iclog:
error = xlog_state_release_iclog(log, iclog, 0); error = xlog_state_release_iclog(log, iclog, 0);
spin_unlock(&log->l_icloglock); spin_unlock(&log->l_icloglock);
...@@ -2473,7 +2465,6 @@ xlog_write( ...@@ -2473,7 +2465,6 @@ xlog_write(
struct xfs_cil_ctx *ctx, struct xfs_cil_ctx *ctx,
struct xfs_log_vec *log_vector, struct xfs_log_vec *log_vector,
struct xlog_ticket *ticket, struct xlog_ticket *ticket,
struct xlog_in_core **commit_iclog,
uint optype) uint optype)
{ {
struct xlog_in_core *iclog = NULL; struct xlog_in_core *iclog = NULL;
...@@ -2602,8 +2593,7 @@ xlog_write( ...@@ -2602,8 +2593,7 @@ xlog_write(
&record_cnt, &data_cnt, &record_cnt, &data_cnt,
&partial_copy, &partial_copy,
&partial_copy_len, &partial_copy_len,
log_offset, log_offset);
commit_iclog);
if (error) if (error)
return error; return error;
...@@ -2641,12 +2631,7 @@ xlog_write( ...@@ -2641,12 +2631,7 @@ xlog_write(
spin_lock(&log->l_icloglock); spin_lock(&log->l_icloglock);
xlog_state_finish_copy(log, iclog, record_cnt, data_cnt); xlog_state_finish_copy(log, iclog, record_cnt, data_cnt);
if (commit_iclog) { error = xlog_state_release_iclog(log, iclog, 0);
ASSERT(optype & XLOG_COMMIT_TRANS);
*commit_iclog = iclog;
} else {
error = xlog_state_release_iclog(log, iclog, 0);
}
spin_unlock(&log->l_icloglock); spin_unlock(&log->l_icloglock);
return error; return error;
......
...@@ -646,11 +646,41 @@ xlog_cil_set_ctx_write_state( ...@@ -646,11 +646,41 @@ xlog_cil_set_ctx_write_state(
xfs_lsn_t lsn = be64_to_cpu(iclog->ic_header.h_lsn); xfs_lsn_t lsn = be64_to_cpu(iclog->ic_header.h_lsn);
ASSERT(!ctx->commit_lsn); ASSERT(!ctx->commit_lsn);
spin_lock(&cil->xc_push_lock); if (!ctx->start_lsn) {
if (!ctx->start_lsn) spin_lock(&cil->xc_push_lock);
ctx->start_lsn = lsn; ctx->start_lsn = lsn;
else spin_unlock(&cil->xc_push_lock);
ctx->commit_lsn = lsn; return;
}
/*
* Take a reference to the iclog for the context so that we still hold
* it when xlog_write is done and has released it. This means the
* context controls when the iclog is released for IO.
*/
atomic_inc(&iclog->ic_refcnt);
/*
* xlog_state_get_iclog_space() guarantees there is enough space in the
* iclog for an entire commit record, so we can attach the context
* callbacks now. This needs to be done before we make the commit_lsn
* visible to waiters so that checkpoints with commit records in the
* same iclog order their IO completion callbacks in the same order that
* the commit records appear in the iclog.
*/
spin_lock(&cil->xc_log->l_icloglock);
list_add_tail(&ctx->iclog_entry, &iclog->ic_callbacks);
spin_unlock(&cil->xc_log->l_icloglock);
/*
* Now we can record the commit LSN and wake anyone waiting for this
* sequence to have the ordered commit record assigned to a physical
* location in the log.
*/
spin_lock(&cil->xc_push_lock);
ctx->commit_iclog = iclog;
ctx->commit_lsn = lsn;
wake_up_all(&cil->xc_commit_wait);
spin_unlock(&cil->xc_push_lock); spin_unlock(&cil->xc_push_lock);
} }
...@@ -707,8 +737,7 @@ xlog_cil_order_write( ...@@ -707,8 +737,7 @@ xlog_cil_order_write(
*/ */
static int static int
xlog_cil_write_commit_record( xlog_cil_write_commit_record(
struct xfs_cil_ctx *ctx, struct xfs_cil_ctx *ctx)
struct xlog_in_core **iclog)
{ {
struct xlog *log = ctx->cil->xc_log; struct xlog *log = ctx->cil->xc_log;
struct xfs_log_iovec reg = { struct xfs_log_iovec reg = {
...@@ -725,8 +754,7 @@ xlog_cil_write_commit_record( ...@@ -725,8 +754,7 @@ xlog_cil_write_commit_record(
if (xlog_is_shutdown(log)) if (xlog_is_shutdown(log))
return -EIO; return -EIO;
error = xlog_write(log, ctx, &vec, ctx->ticket, iclog, error = xlog_write(log, ctx, &vec, ctx->ticket, XLOG_COMMIT_TRANS);
XLOG_COMMIT_TRANS);
if (error) if (error)
xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR); xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
return error; return error;
...@@ -756,7 +784,6 @@ xlog_cil_push_work( ...@@ -756,7 +784,6 @@ xlog_cil_push_work(
struct xfs_log_vec *lv; struct xfs_log_vec *lv;
struct xfs_cil_ctx *ctx; struct xfs_cil_ctx *ctx;
struct xfs_cil_ctx *new_ctx; struct xfs_cil_ctx *new_ctx;
struct xlog_in_core *commit_iclog;
struct xlog_ticket *tic; struct xlog_ticket *tic;
int num_iovecs; int num_iovecs;
int error = 0; int error = 0;
...@@ -945,7 +972,7 @@ xlog_cil_push_work( ...@@ -945,7 +972,7 @@ xlog_cil_push_work(
*/ */
wait_for_completion(&bdev_flush); wait_for_completion(&bdev_flush);
error = xlog_write(log, ctx, &lvhdr, tic, NULL, XLOG_START_TRANS); error = xlog_write(log, ctx, &lvhdr, tic, XLOG_START_TRANS);
if (error) if (error)
goto out_abort_free_ticket; goto out_abort_free_ticket;
...@@ -953,36 +980,12 @@ xlog_cil_push_work( ...@@ -953,36 +980,12 @@ xlog_cil_push_work(
if (error) if (error)
goto out_abort_free_ticket; goto out_abort_free_ticket;
error = xlog_cil_write_commit_record(ctx, &commit_iclog); error = xlog_cil_write_commit_record(ctx);
if (error) if (error)
goto out_abort_free_ticket; goto out_abort_free_ticket;
xfs_log_ticket_ungrant(log, tic); xfs_log_ticket_ungrant(log, tic);
/*
* Once we attach the ctx to the iclog, it is effectively owned by the
* iclog and we can only use it while we still have an active reference
* to the iclog. i.e. once we call xlog_state_release_iclog() we can no
* longer safely reference the ctx.
*/
spin_lock(&log->l_icloglock);
if (xlog_is_shutdown(log)) {
spin_unlock(&log->l_icloglock);
goto out_abort;
}
ASSERT_ALWAYS(commit_iclog->ic_state == XLOG_STATE_ACTIVE ||
commit_iclog->ic_state == XLOG_STATE_WANT_SYNC);
list_add_tail(&ctx->iclog_entry, &commit_iclog->ic_callbacks);
/*
* now the checkpoint commit is complete and we've attached the
* callbacks to the iclog we can assign the commit LSN to the context
* and wake up anyone who is waiting for the commit to complete.
*/
spin_lock(&cil->xc_push_lock);
wake_up_all(&cil->xc_commit_wait);
spin_unlock(&cil->xc_push_lock);
/* /*
* If the checkpoint spans multiple iclogs, wait for all previous iclogs * If the checkpoint spans multiple iclogs, wait for all previous iclogs
* to complete before we submit the commit_iclog. We can't use state * to complete before we submit the commit_iclog. We can't use state
...@@ -995,17 +998,18 @@ xlog_cil_push_work( ...@@ -995,17 +998,18 @@ xlog_cil_push_work(
* iclog header lsn and compare it to the commit lsn to determine if we * iclog header lsn and compare it to the commit lsn to determine if we
* need to wait on iclogs or not. * need to wait on iclogs or not.
*/ */
spin_lock(&log->l_icloglock);
if (ctx->start_lsn != ctx->commit_lsn) { if (ctx->start_lsn != ctx->commit_lsn) {
xfs_lsn_t plsn; xfs_lsn_t plsn;
plsn = be64_to_cpu(commit_iclog->ic_prev->ic_header.h_lsn); plsn = be64_to_cpu(ctx->commit_iclog->ic_prev->ic_header.h_lsn);
if (plsn && XFS_LSN_CMP(plsn, ctx->commit_lsn) < 0) { if (plsn && XFS_LSN_CMP(plsn, ctx->commit_lsn) < 0) {
/* /*
* Waiting on ic_force_wait orders the completion of * Waiting on ic_force_wait orders the completion of
* iclogs older than ic_prev. Hence we only need to wait * iclogs older than ic_prev. Hence we only need to wait
* on the most recent older iclog here. * on the most recent older iclog here.
*/ */
xlog_wait_on_iclog(commit_iclog->ic_prev); xlog_wait_on_iclog(ctx->commit_iclog->ic_prev);
spin_lock(&log->l_icloglock); spin_lock(&log->l_icloglock);
} }
...@@ -1013,7 +1017,7 @@ xlog_cil_push_work( ...@@ -1013,7 +1017,7 @@ xlog_cil_push_work(
* We need to issue a pre-flush so that the ordering for this * We need to issue a pre-flush so that the ordering for this
* checkpoint is correctly preserved down to stable storage. * checkpoint is correctly preserved down to stable storage.
*/ */
commit_iclog->ic_flags |= XLOG_ICL_NEED_FLUSH; ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FLUSH;
} }
/* /*
...@@ -1021,8 +1025,8 @@ xlog_cil_push_work( ...@@ -1021,8 +1025,8 @@ xlog_cil_push_work(
* journal IO vs metadata writeback IO is correctly ordered on stable * journal IO vs metadata writeback IO is correctly ordered on stable
* storage. * storage.
*/ */
commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA; ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA;
xlog_state_release_iclog(log, commit_iclog, preflush_tail_lsn); xlog_state_release_iclog(log, ctx->commit_iclog, preflush_tail_lsn);
/* Not safe to reference ctx now! */ /* Not safe to reference ctx now! */
...@@ -1037,9 +1041,15 @@ xlog_cil_push_work( ...@@ -1037,9 +1041,15 @@ xlog_cil_push_work(
out_abort_free_ticket: out_abort_free_ticket:
xfs_log_ticket_ungrant(log, tic); xfs_log_ticket_ungrant(log, tic);
out_abort:
ASSERT(xlog_is_shutdown(log)); ASSERT(xlog_is_shutdown(log));
xlog_cil_committed(ctx); if (!ctx->commit_iclog) {
xlog_cil_committed(ctx);
return;
}
spin_lock(&log->l_icloglock);
xlog_state_release_iclog(log, ctx->commit_iclog, 0);
/* Not safe to reference ctx now! */
spin_unlock(&log->l_icloglock);
} }
/* /*
......
...@@ -240,6 +240,7 @@ struct xfs_cil_ctx { ...@@ -240,6 +240,7 @@ struct xfs_cil_ctx {
xfs_csn_t sequence; /* chkpt sequence # */ xfs_csn_t sequence; /* chkpt sequence # */
xfs_lsn_t start_lsn; /* first LSN of chkpt commit */ xfs_lsn_t start_lsn; /* first LSN of chkpt commit */
xfs_lsn_t commit_lsn; /* chkpt commit record lsn */ xfs_lsn_t commit_lsn; /* chkpt commit record lsn */
struct xlog_in_core *commit_iclog;
struct xlog_ticket *ticket; /* chkpt ticket */ struct xlog_ticket *ticket; /* chkpt ticket */
int nvecs; /* number of regions */ int nvecs; /* number of regions */
int space_used; /* aggregate size of regions */ int space_used; /* aggregate size of regions */
...@@ -514,7 +515,7 @@ void xlog_print_tic_res(struct xfs_mount *mp, struct xlog_ticket *ticket); ...@@ -514,7 +515,7 @@ void xlog_print_tic_res(struct xfs_mount *mp, struct xlog_ticket *ticket);
void xlog_print_trans(struct xfs_trans *); void xlog_print_trans(struct xfs_trans *);
int xlog_write(struct xlog *log, struct xfs_cil_ctx *ctx, int xlog_write(struct xlog *log, struct xfs_cil_ctx *ctx,
struct xfs_log_vec *log_vector, struct xlog_ticket *tic, struct xfs_log_vec *log_vector, struct xlog_ticket *tic,
struct xlog_in_core **commit_iclog, uint optype); uint optype);
void xfs_log_ticket_ungrant(struct xlog *log, struct xlog_ticket *ticket); void xfs_log_ticket_ungrant(struct xlog *log, struct xlog_ticket *ticket);
void xfs_log_ticket_regrant(struct xlog *log, struct xlog_ticket *ticket); void xfs_log_ticket_regrant(struct xlog *log, struct xlog_ticket *ticket);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment