Commit 4c2d542f authored by Dave Chinner's avatar Dave Chinner Committed by Ben Myers

xfs: Do background CIL flushes via a workqueue

Doing background CIL flushes adds significant latency to whatever
async transaction that triggers it. To avoid blocking async
transactions on things like waiting for log buffer IO to complete,
move the CIL push off into a workqueue.  By moving the push work
into a workqueue, we remove all the latency that the commit adds
from the foreground transaction commit path. This also means that
single threaded workloads won't do the CIL push procssing, leaving
them more CPU to do more async transactions.

To do this, we need to keep track of the sequence number we have
pushed work for. This avoids having many transaction commits
attempting to schedule work for the same sequence, and ensures that
we only ever have one push (background or forced) in progress at a
time. It also means that we don't need to take the CIL lock in write
mode to check for potential background push races, which reduces
lock contention.

To avoid potential issues with "smart" IO schedulers, don't use the
workqueue for log force triggered flushes. Instead, do them directly
so that the log IO is done directly by the process issuing the log
force and so doesn't get stuck on IO elevator queue idling
incorrectly delaying the log IO from the workqueue.
Signed-off-by: default avatarDave Chinner <dchinner@redhat.com>
Reviewed-by: default avatarMark Tinguely <tinguely@sgi.com>
Signed-off-by: default avatarBen Myers <bpm@sgi.com>
parent 04913fdd
......@@ -31,58 +31,6 @@
#include "xfs_alloc.h"
#include "xfs_discard.h"
/*
* Perform initial CIL structure initialisation.
*/
int
xlog_cil_init(
struct log *log)
{
struct xfs_cil *cil;
struct xfs_cil_ctx *ctx;
cil = kmem_zalloc(sizeof(*cil), KM_SLEEP|KM_MAYFAIL);
if (!cil)
return ENOMEM;
ctx = kmem_zalloc(sizeof(*ctx), KM_SLEEP|KM_MAYFAIL);
if (!ctx) {
kmem_free(cil);
return ENOMEM;
}
INIT_LIST_HEAD(&cil->xc_cil);
INIT_LIST_HEAD(&cil->xc_committing);
spin_lock_init(&cil->xc_cil_lock);
init_rwsem(&cil->xc_ctx_lock);
init_waitqueue_head(&cil->xc_commit_wait);
INIT_LIST_HEAD(&ctx->committing);
INIT_LIST_HEAD(&ctx->busy_extents);
ctx->sequence = 1;
ctx->cil = cil;
cil->xc_ctx = ctx;
cil->xc_current_sequence = ctx->sequence;
cil->xc_log = log;
log->l_cilp = cil;
return 0;
}
void
xlog_cil_destroy(
struct log *log)
{
if (log->l_cilp->xc_ctx) {
if (log->l_cilp->xc_ctx->ticket)
xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
kmem_free(log->l_cilp->xc_ctx);
}
ASSERT(list_empty(&log->l_cilp->xc_cil));
kmem_free(log->l_cilp);
}
/*
* Allocate a new ticket. Failing to get a new ticket makes it really hard to
* recover, so we don't allow failure here. Also, we allocate in a context that
......@@ -426,8 +374,7 @@ xlog_cil_committed(
*/
STATIC int
xlog_cil_push(
struct log *log,
xfs_lsn_t push_seq)
struct log *log)
{
struct xfs_cil *cil = log->l_cilp;
struct xfs_log_vec *lv;
......@@ -443,39 +390,36 @@ xlog_cil_push(
struct xfs_log_iovec lhdr;
struct xfs_log_vec lvhdr = { NULL };
xfs_lsn_t commit_lsn;
xfs_lsn_t push_seq;
if (!cil)
return 0;
ASSERT(!push_seq || push_seq <= cil->xc_ctx->sequence);
new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_SLEEP|KM_NOFS);
new_ctx->ticket = xlog_cil_ticket_alloc(log);
/*
* Lock out transaction commit, but don't block for background pushes
* unless we are well over the CIL space limit. See the definition of
* XLOG_CIL_HARD_SPACE_LIMIT() for the full explanation of the logic
* used here.
*/
if (!down_write_trylock(&cil->xc_ctx_lock)) {
if (!push_seq &&
cil->xc_ctx->space_used < XLOG_CIL_HARD_SPACE_LIMIT(log))
goto out_free_ticket;
down_write(&cil->xc_ctx_lock);
}
down_write(&cil->xc_ctx_lock);
ctx = cil->xc_ctx;
/* check if we've anything to push */
if (list_empty(&cil->xc_cil))
goto out_skip;
spin_lock(&cil->xc_cil_lock);
push_seq = cil->xc_push_seq;
ASSERT(push_seq <= ctx->sequence);
/* check for spurious background flush */
if (!push_seq && cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log))
/*
* Check if we've anything to push. If there is nothing, then we don't
* move on to a new sequence number and so we have to be able to push
* this sequence again later.
*/
if (list_empty(&cil->xc_cil)) {
cil->xc_push_seq = 0;
spin_unlock(&cil->xc_cil_lock);
goto out_skip;
}
spin_unlock(&cil->xc_cil_lock);
/* check for a previously pushed seqeunce */
if (push_seq && push_seq < cil->xc_ctx->sequence)
if (push_seq < cil->xc_ctx->sequence)
goto out_skip;
/*
......@@ -629,7 +573,6 @@ xlog_cil_push(
out_skip:
up_write(&cil->xc_ctx_lock);
out_free_ticket:
xfs_log_ticket_put(new_ctx->ticket);
kmem_free(new_ctx);
return 0;
......@@ -641,6 +584,82 @@ xlog_cil_push(
return XFS_ERROR(EIO);
}
static void
xlog_cil_push_work(
struct work_struct *work)
{
struct xfs_cil *cil = container_of(work, struct xfs_cil,
xc_push_work);
xlog_cil_push(cil->xc_log);
}
/*
* We need to push CIL every so often so we don't cache more than we can fit in
* the log. The limit really is that a checkpoint can't be more than half the
* log (the current checkpoint is not allowed to overwrite the previous
* checkpoint), but commit latency and memory usage limit this to a smaller
* size.
*/
static void
xlog_cil_push_background(
struct log *log)
{
struct xfs_cil *cil = log->l_cilp;
/*
* The cil won't be empty because we are called while holding the
* context lock so whatever we added to the CIL will still be there
*/
ASSERT(!list_empty(&cil->xc_cil));
/*
* don't do a background push if we haven't used up all the
* space available yet.
*/
if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log))
return;
spin_lock(&cil->xc_cil_lock);
if (cil->xc_push_seq < cil->xc_current_sequence) {
cil->xc_push_seq = cil->xc_current_sequence;
queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work);
}
spin_unlock(&cil->xc_cil_lock);
}
static void
xlog_cil_push_foreground(
struct log *log,
xfs_lsn_t push_seq)
{
struct xfs_cil *cil = log->l_cilp;
if (!cil)
return;
ASSERT(push_seq && push_seq <= cil->xc_current_sequence);
/* start on any pending background push to minimise wait time on it */
flush_work(&cil->xc_push_work);
/*
* If the CIL is empty or we've already pushed the sequence then
* there's no work we need to do.
*/
spin_lock(&cil->xc_cil_lock);
if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) {
spin_unlock(&cil->xc_cil_lock);
return;
}
cil->xc_push_seq = push_seq;
spin_unlock(&cil->xc_cil_lock);
/* do the push now */
xlog_cil_push(log);
}
/*
* Commit a transaction with the given vector to the Committed Item List.
*
......@@ -667,7 +686,6 @@ xfs_log_commit_cil(
{
struct log *log = mp->m_log;
int log_flags = 0;
int push = 0;
struct xfs_log_vec *log_vector;
if (flags & XFS_TRANS_RELEASE_LOG_RES)
......@@ -719,21 +737,9 @@ xfs_log_commit_cil(
*/
xfs_trans_free_items(tp, *commit_lsn, 0);
/* check for background commit before unlock */
if (log->l_cilp->xc_ctx->space_used > XLOG_CIL_SPACE_LIMIT(log))
push = 1;
xlog_cil_push_background(log);
up_read(&log->l_cilp->xc_ctx_lock);
/*
* We need to push CIL every so often so we don't cache more than we
* can fit in the log. The limit really is that a checkpoint can't be
* more than half the log (the current checkpoint is not allowed to
* overwrite the previous checkpoint), but commit latency and memory
* usage limit this to a smaller size in most cases.
*/
if (push)
xlog_cil_push(log, 0);
return 0;
}
......@@ -746,9 +752,6 @@ xfs_log_commit_cil(
*
* We return the current commit lsn to allow the callers to determine if a
* iclog flush is necessary following this call.
*
* XXX: Initially, just push the CIL unconditionally and return whatever
* commit lsn is there. It'll be empty, so this is broken for now.
*/
xfs_lsn_t
xlog_cil_force_lsn(
......@@ -766,8 +769,7 @@ xlog_cil_force_lsn(
* xlog_cil_push() handles racing pushes for the same sequence,
* so no need to deal with it here.
*/
if (sequence == cil->xc_current_sequence)
xlog_cil_push(log, sequence);
xlog_cil_push_foreground(log, sequence);
/*
* See if we can find a previous sequence still committing.
......@@ -826,3 +828,57 @@ xfs_log_item_in_current_chkpt(
return false;
return true;
}
/*
* Perform initial CIL structure initialisation.
*/
int
xlog_cil_init(
struct log *log)
{
struct xfs_cil *cil;
struct xfs_cil_ctx *ctx;
cil = kmem_zalloc(sizeof(*cil), KM_SLEEP|KM_MAYFAIL);
if (!cil)
return ENOMEM;
ctx = kmem_zalloc(sizeof(*ctx), KM_SLEEP|KM_MAYFAIL);
if (!ctx) {
kmem_free(cil);
return ENOMEM;
}
INIT_WORK(&cil->xc_push_work, xlog_cil_push_work);
INIT_LIST_HEAD(&cil->xc_cil);
INIT_LIST_HEAD(&cil->xc_committing);
spin_lock_init(&cil->xc_cil_lock);
init_rwsem(&cil->xc_ctx_lock);
init_waitqueue_head(&cil->xc_commit_wait);
INIT_LIST_HEAD(&ctx->committing);
INIT_LIST_HEAD(&ctx->busy_extents);
ctx->sequence = 1;
ctx->cil = cil;
cil->xc_ctx = ctx;
cil->xc_current_sequence = ctx->sequence;
cil->xc_log = log;
log->l_cilp = cil;
return 0;
}
void
xlog_cil_destroy(
struct log *log)
{
if (log->l_cilp->xc_ctx) {
if (log->l_cilp->xc_ctx->ticket)
xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
kmem_free(log->l_cilp->xc_ctx);
}
ASSERT(list_empty(&log->l_cilp->xc_cil));
kmem_free(log->l_cilp);
}
......@@ -417,6 +417,8 @@ struct xfs_cil {
struct list_head xc_committing;
wait_queue_head_t xc_commit_wait;
xfs_lsn_t xc_current_sequence;
struct work_struct xc_push_work;
xfs_lsn_t xc_push_seq;
};
/*
......
......@@ -214,6 +214,7 @@ typedef struct xfs_mount {
struct workqueue_struct *m_data_workqueue;
struct workqueue_struct *m_unwritten_workqueue;
struct workqueue_struct *m_cil_workqueue;
} xfs_mount_t;
/*
......
......@@ -773,8 +773,14 @@ xfs_init_mount_workqueues(
if (!mp->m_unwritten_workqueue)
goto out_destroy_data_iodone_queue;
mp->m_cil_workqueue = alloc_workqueue("xfs-cil/%s",
WQ_MEM_RECLAIM, 0, mp->m_fsname);
if (!mp->m_cil_workqueue)
goto out_destroy_unwritten;
return 0;
out_destroy_unwritten:
destroy_workqueue(mp->m_unwritten_workqueue);
out_destroy_data_iodone_queue:
destroy_workqueue(mp->m_data_workqueue);
out:
......@@ -785,6 +791,7 @@ STATIC void
xfs_destroy_mount_workqueues(
struct xfs_mount *mp)
{
destroy_workqueue(mp->m_cil_workqueue);
destroy_workqueue(mp->m_data_workqueue);
destroy_workqueue(mp->m_unwritten_workqueue);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment