Commit 83ba7b07 authored by Christoph Hellwig's avatar Christoph Hellwig Committed by Jens Axboe

writeback: simplify the write back thread queue

First remove items from work_list as soon as we start working on them.  This
means we don't have to track any pending or visited state and can get
rid of all the RCU magic freeing the work items - we can simply free
them once the operation has finished.  Second use a real completion for
tracking synchronous requests - if the caller sets the completion pointer
we complete it, otherwise use it as a boolean indicator that we can free
the work item directly.  Third unify struct wb_writeback_args and struct
bdi_work into a single data structure, wb_writeback_work.  Previous we
set all parameters into a struct wb_writeback_args, copied it into
struct bdi_work, copied it again on the stack to use it there.  Instead
of just allocate one structure dynamically or on the stack and use it
all the way through the stack.
Signed-off-by: default avatarChristoph Hellwig <hch@lst.de>
Signed-off-by: default avatarJens Axboe <jaxboe@fusionio.com>
parent edadfb10
...@@ -38,43 +38,18 @@ int nr_pdflush_threads; ...@@ -38,43 +38,18 @@ int nr_pdflush_threads;
/* /*
* Passed into wb_writeback(), essentially a subset of writeback_control * Passed into wb_writeback(), essentially a subset of writeback_control
*/ */
struct wb_writeback_args { struct wb_writeback_work {
long nr_pages; long nr_pages;
struct super_block *sb; struct super_block *sb;
enum writeback_sync_modes sync_mode; enum writeback_sync_modes sync_mode;
unsigned int for_kupdate:1; unsigned int for_kupdate:1;
unsigned int range_cyclic:1; unsigned int range_cyclic:1;
unsigned int for_background:1; unsigned int for_background:1;
};
/*
* Work items for the bdi_writeback threads
*/
struct bdi_work {
struct list_head list; /* pending work list */ struct list_head list; /* pending work list */
struct rcu_head rcu_head; /* for RCU free/clear of work */ struct completion *done; /* set if the caller waits */
unsigned long seen; /* threads that have seen this work */
atomic_t pending; /* number of threads still to do work */
struct wb_writeback_args args; /* writeback arguments */
unsigned long state; /* flag bits, see WS_* */
}; };
enum {
WS_INPROGRESS = 0,
WS_ONSTACK,
};
static inline void bdi_work_init(struct bdi_work *work,
struct wb_writeback_args *args)
{
INIT_RCU_HEAD(&work->rcu_head);
work->args = *args;
__set_bit(WS_INPROGRESS, &work->state);
}
/** /**
* writeback_in_progress - determine whether there is writeback in progress * writeback_in_progress - determine whether there is writeback in progress
* @bdi: the device's backing_dev_info structure. * @bdi: the device's backing_dev_info structure.
...@@ -87,49 +62,11 @@ int writeback_in_progress(struct backing_dev_info *bdi) ...@@ -87,49 +62,11 @@ int writeback_in_progress(struct backing_dev_info *bdi)
return !list_empty(&bdi->work_list); return !list_empty(&bdi->work_list);
} }
static void bdi_work_free(struct rcu_head *head) static void bdi_queue_work(struct backing_dev_info *bdi,
{ struct wb_writeback_work *work)
struct bdi_work *work = container_of(head, struct bdi_work, rcu_head);
clear_bit(WS_INPROGRESS, &work->state);
smp_mb__after_clear_bit();
wake_up_bit(&work->state, WS_INPROGRESS);
if (!test_bit(WS_ONSTACK, &work->state))
kfree(work);
}
static void wb_clear_pending(struct bdi_writeback *wb, struct bdi_work *work)
{
/*
* The caller has retrieved the work arguments from this work,
* drop our reference. If this is the last ref, delete and free it
*/
if (atomic_dec_and_test(&work->pending)) {
struct backing_dev_info *bdi = wb->bdi;
spin_lock(&bdi->wb_lock);
list_del_rcu(&work->list);
spin_unlock(&bdi->wb_lock);
call_rcu(&work->rcu_head, bdi_work_free);
}
}
static void bdi_queue_work(struct backing_dev_info *bdi, struct bdi_work *work)
{ {
work->seen = bdi->wb_mask;
BUG_ON(!work->seen);
atomic_set(&work->pending, bdi->wb_cnt);
BUG_ON(!bdi->wb_cnt);
/*
* list_add_tail_rcu() contains the necessary barriers to
* make sure the above stores are seen before the item is
* noticed on the list
*/
spin_lock(&bdi->wb_lock); spin_lock(&bdi->wb_lock);
list_add_tail_rcu(&work->list, &bdi->work_list); list_add_tail(&work->list, &bdi->work_list);
spin_unlock(&bdi->wb_lock); spin_unlock(&bdi->wb_lock);
/* /*
...@@ -146,55 +83,29 @@ static void bdi_queue_work(struct backing_dev_info *bdi, struct bdi_work *work) ...@@ -146,55 +83,29 @@ static void bdi_queue_work(struct backing_dev_info *bdi, struct bdi_work *work)
} }
} }
/* static void
* Used for on-stack allocated work items. The caller needs to wait until __bdi_start_writeback(struct backing_dev_info *bdi, long nr_pages,
* the wb threads have acked the work before it's safe to continue. bool range_cyclic, bool for_background)
*/
static void bdi_wait_on_work_done(struct bdi_work *work)
{ {
wait_on_bit(&work->state, WS_INPROGRESS, bdi_sched_wait, struct wb_writeback_work *work;
TASK_UNINTERRUPTIBLE);
}
static void bdi_alloc_queue_work(struct backing_dev_info *bdi,
struct wb_writeback_args *args)
{
struct bdi_work *work;
/* /*
* This is WB_SYNC_NONE writeback, so if allocation fails just * This is WB_SYNC_NONE writeback, so if allocation fails just
* wakeup the thread for old dirty data writeback * wakeup the thread for old dirty data writeback
*/ */
work = kmalloc(sizeof(*work), GFP_ATOMIC); work = kzalloc(sizeof(*work), GFP_ATOMIC);
if (work) { if (!work) {
bdi_work_init(work, args); if (bdi->wb.task)
bdi_queue_work(bdi, work); wake_up_process(bdi->wb.task);
} else { return;
struct bdi_writeback *wb = &bdi->wb;
if (wb->task)
wake_up_process(wb->task);
} }
}
/** work->sync_mode = WB_SYNC_NONE;
* bdi_queue_work_onstack - start and wait for writeback work->nr_pages = nr_pages;
* @args: parameters to control the work queue writeback work->range_cyclic = range_cyclic;
* work->for_background = for_background;
* Description:
* This function initiates writeback and waits for the operation to
* complete. Callers must hold the sb s_umount semaphore for
* reading, to avoid having the super disappear before we are done.
*/
static void bdi_queue_work_onstack(struct wb_writeback_args *args)
{
struct bdi_work work;
bdi_work_init(&work, args);
__set_bit(WS_ONSTACK, &work.state);
bdi_queue_work(args->sb->s_bdi, &work); bdi_queue_work(bdi, work);
bdi_wait_on_work_done(&work);
} }
/** /**
...@@ -210,13 +121,7 @@ static void bdi_queue_work_onstack(struct wb_writeback_args *args) ...@@ -210,13 +121,7 @@ static void bdi_queue_work_onstack(struct wb_writeback_args *args)
*/ */
void bdi_start_writeback(struct backing_dev_info *bdi, long nr_pages) void bdi_start_writeback(struct backing_dev_info *bdi, long nr_pages)
{ {
struct wb_writeback_args args = { __bdi_start_writeback(bdi, nr_pages, true, false);
.sync_mode = WB_SYNC_NONE,
.nr_pages = nr_pages,
.range_cyclic = 1,
};
bdi_alloc_queue_work(bdi, &args);
} }
/** /**
...@@ -230,13 +135,7 @@ void bdi_start_writeback(struct backing_dev_info *bdi, long nr_pages) ...@@ -230,13 +135,7 @@ void bdi_start_writeback(struct backing_dev_info *bdi, long nr_pages)
*/ */
void bdi_start_background_writeback(struct backing_dev_info *bdi) void bdi_start_background_writeback(struct backing_dev_info *bdi)
{ {
struct wb_writeback_args args = { __bdi_start_writeback(bdi, LONG_MAX, true, true);
.sync_mode = WB_SYNC_NONE,
.nr_pages = LONG_MAX,
.for_background = 1,
.range_cyclic = 1,
};
bdi_alloc_queue_work(bdi, &args);
} }
/* /*
...@@ -703,14 +602,14 @@ static inline bool over_bground_thresh(void) ...@@ -703,14 +602,14 @@ static inline bool over_bground_thresh(void)
* all dirty pages if they are all attached to "old" mappings. * all dirty pages if they are all attached to "old" mappings.
*/ */
static long wb_writeback(struct bdi_writeback *wb, static long wb_writeback(struct bdi_writeback *wb,
struct wb_writeback_args *args) struct wb_writeback_work *work)
{ {
struct writeback_control wbc = { struct writeback_control wbc = {
.sync_mode = args->sync_mode, .sync_mode = work->sync_mode,
.older_than_this = NULL, .older_than_this = NULL,
.for_kupdate = args->for_kupdate, .for_kupdate = work->for_kupdate,
.for_background = args->for_background, .for_background = work->for_background,
.range_cyclic = args->range_cyclic, .range_cyclic = work->range_cyclic,
}; };
unsigned long oldest_jif; unsigned long oldest_jif;
long wrote = 0; long wrote = 0;
...@@ -730,24 +629,24 @@ static long wb_writeback(struct bdi_writeback *wb, ...@@ -730,24 +629,24 @@ static long wb_writeback(struct bdi_writeback *wb,
/* /*
* Stop writeback when nr_pages has been consumed * Stop writeback when nr_pages has been consumed
*/ */
if (args->nr_pages <= 0) if (work->nr_pages <= 0)
break; break;
/* /*
* For background writeout, stop when we are below the * For background writeout, stop when we are below the
* background dirty threshold * background dirty threshold
*/ */
if (args->for_background && !over_bground_thresh()) if (work->for_background && !over_bground_thresh())
break; break;
wbc.more_io = 0; wbc.more_io = 0;
wbc.nr_to_write = MAX_WRITEBACK_PAGES; wbc.nr_to_write = MAX_WRITEBACK_PAGES;
wbc.pages_skipped = 0; wbc.pages_skipped = 0;
if (args->sb) if (work->sb)
__writeback_inodes_sb(args->sb, wb, &wbc); __writeback_inodes_sb(work->sb, wb, &wbc);
else else
writeback_inodes_wb(wb, &wbc); writeback_inodes_wb(wb, &wbc);
args->nr_pages -= MAX_WRITEBACK_PAGES - wbc.nr_to_write; work->nr_pages -= MAX_WRITEBACK_PAGES - wbc.nr_to_write;
wrote += MAX_WRITEBACK_PAGES - wbc.nr_to_write; wrote += MAX_WRITEBACK_PAGES - wbc.nr_to_write;
/* /*
...@@ -783,31 +682,21 @@ static long wb_writeback(struct bdi_writeback *wb, ...@@ -783,31 +682,21 @@ static long wb_writeback(struct bdi_writeback *wb,
} }
/* /*
* Return the next bdi_work struct that hasn't been processed by this * Return the next wb_writeback_work struct that hasn't been processed yet.
* wb thread yet. ->seen is initially set for each thread that exists */
* for this device, when a thread first notices a piece of work it static struct wb_writeback_work *
* clears its bit. Depending on writeback type, the thread will notify get_next_work_item(struct backing_dev_info *bdi, struct bdi_writeback *wb)
* completion on either receiving the work (WB_SYNC_NONE) or after
* it is done (WB_SYNC_ALL).
*/
static struct bdi_work *get_next_work_item(struct backing_dev_info *bdi,
struct bdi_writeback *wb)
{ {
struct bdi_work *work, *ret = NULL; struct wb_writeback_work *work = NULL;
rcu_read_lock();
list_for_each_entry_rcu(work, &bdi->work_list, list) { spin_lock(&bdi->wb_lock);
if (!test_bit(wb->nr, &work->seen)) if (!list_empty(&bdi->work_list)) {
continue; work = list_entry(bdi->work_list.next,
clear_bit(wb->nr, &work->seen); struct wb_writeback_work, list);
list_del_init(&work->list);
ret = work;
break;
} }
spin_unlock(&bdi->wb_lock);
rcu_read_unlock(); return work;
return ret;
} }
static long wb_check_old_data_flush(struct bdi_writeback *wb) static long wb_check_old_data_flush(struct bdi_writeback *wb)
...@@ -832,14 +721,14 @@ static long wb_check_old_data_flush(struct bdi_writeback *wb) ...@@ -832,14 +721,14 @@ static long wb_check_old_data_flush(struct bdi_writeback *wb)
(inodes_stat.nr_inodes - inodes_stat.nr_unused); (inodes_stat.nr_inodes - inodes_stat.nr_unused);
if (nr_pages) { if (nr_pages) {
struct wb_writeback_args args = { struct wb_writeback_work work = {
.nr_pages = nr_pages, .nr_pages = nr_pages,
.sync_mode = WB_SYNC_NONE, .sync_mode = WB_SYNC_NONE,
.for_kupdate = 1, .for_kupdate = 1,
.range_cyclic = 1, .range_cyclic = 1,
}; };
return wb_writeback(wb, &args); return wb_writeback(wb, &work);
} }
return 0; return 0;
...@@ -851,33 +740,27 @@ static long wb_check_old_data_flush(struct bdi_writeback *wb) ...@@ -851,33 +740,27 @@ static long wb_check_old_data_flush(struct bdi_writeback *wb)
long wb_do_writeback(struct bdi_writeback *wb, int force_wait) long wb_do_writeback(struct bdi_writeback *wb, int force_wait)
{ {
struct backing_dev_info *bdi = wb->bdi; struct backing_dev_info *bdi = wb->bdi;
struct bdi_work *work; struct wb_writeback_work *work;
long wrote = 0; long wrote = 0;
while ((work = get_next_work_item(bdi, wb)) != NULL) { while ((work = get_next_work_item(bdi, wb)) != NULL) {
struct wb_writeback_args args = work->args;
/* /*
* Override sync mode, in case we must wait for completion * Override sync mode, in case we must wait for completion
* because this thread is exiting now.
*/ */
if (force_wait) if (force_wait)
work->args.sync_mode = args.sync_mode = WB_SYNC_ALL; work->sync_mode = WB_SYNC_ALL;
/*
* If this isn't a data integrity operation, just notify
* that we have seen this work and we are now starting it.
*/
if (!test_bit(WS_ONSTACK, &work->state))
wb_clear_pending(wb, work);
wrote += wb_writeback(wb, &args); wrote += wb_writeback(wb, work);
/* /*
* This is a data integrity writeback, so only do the * Notify the caller of completion if this is a synchronous
* notification when we have completed the work. * work item, otherwise just free it.
*/ */
if (test_bit(WS_ONSTACK, &work->state)) if (work->done)
wb_clear_pending(wb, work); complete(work->done);
else
kfree(work);
} }
/* /*
...@@ -940,14 +823,9 @@ int bdi_writeback_task(struct bdi_writeback *wb) ...@@ -940,14 +823,9 @@ int bdi_writeback_task(struct bdi_writeback *wb)
void wakeup_flusher_threads(long nr_pages) void wakeup_flusher_threads(long nr_pages)
{ {
struct backing_dev_info *bdi; struct backing_dev_info *bdi;
struct wb_writeback_args args = {
.sync_mode = WB_SYNC_NONE,
};
if (nr_pages) { if (!nr_pages) {
args.nr_pages = nr_pages; nr_pages = global_page_state(NR_FILE_DIRTY) +
} else {
args.nr_pages = global_page_state(NR_FILE_DIRTY) +
global_page_state(NR_UNSTABLE_NFS); global_page_state(NR_UNSTABLE_NFS);
} }
...@@ -955,7 +833,7 @@ void wakeup_flusher_threads(long nr_pages) ...@@ -955,7 +833,7 @@ void wakeup_flusher_threads(long nr_pages)
list_for_each_entry_rcu(bdi, &bdi_list, bdi_list) { list_for_each_entry_rcu(bdi, &bdi_list, bdi_list) {
if (!bdi_has_dirty_io(bdi)) if (!bdi_has_dirty_io(bdi))
continue; continue;
bdi_alloc_queue_work(bdi, &args); __bdi_start_writeback(bdi, nr_pages, false, false);
} }
rcu_read_unlock(); rcu_read_unlock();
} }
...@@ -1164,17 +1042,20 @@ void writeback_inodes_sb(struct super_block *sb) ...@@ -1164,17 +1042,20 @@ void writeback_inodes_sb(struct super_block *sb)
{ {
unsigned long nr_dirty = global_page_state(NR_FILE_DIRTY); unsigned long nr_dirty = global_page_state(NR_FILE_DIRTY);
unsigned long nr_unstable = global_page_state(NR_UNSTABLE_NFS); unsigned long nr_unstable = global_page_state(NR_UNSTABLE_NFS);
struct wb_writeback_args args = { DECLARE_COMPLETION_ONSTACK(done);
struct wb_writeback_work work = {
.sb = sb, .sb = sb,
.sync_mode = WB_SYNC_NONE, .sync_mode = WB_SYNC_NONE,
.done = &done,
}; };
WARN_ON(!rwsem_is_locked(&sb->s_umount)); WARN_ON(!rwsem_is_locked(&sb->s_umount));
args.nr_pages = nr_dirty + nr_unstable + work.nr_pages = nr_dirty + nr_unstable +
(inodes_stat.nr_inodes - inodes_stat.nr_unused); (inodes_stat.nr_inodes - inodes_stat.nr_unused);
bdi_queue_work_onstack(&args); bdi_queue_work(sb->s_bdi, &work);
wait_for_completion(&done);
} }
EXPORT_SYMBOL(writeback_inodes_sb); EXPORT_SYMBOL(writeback_inodes_sb);
...@@ -1206,16 +1087,20 @@ EXPORT_SYMBOL(writeback_inodes_sb_if_idle); ...@@ -1206,16 +1087,20 @@ EXPORT_SYMBOL(writeback_inodes_sb_if_idle);
*/ */
void sync_inodes_sb(struct super_block *sb) void sync_inodes_sb(struct super_block *sb)
{ {
struct wb_writeback_args args = { DECLARE_COMPLETION_ONSTACK(done);
struct wb_writeback_work work = {
.sb = sb, .sb = sb,
.sync_mode = WB_SYNC_ALL, .sync_mode = WB_SYNC_ALL,
.nr_pages = LONG_MAX, .nr_pages = LONG_MAX,
.range_cyclic = 0, .range_cyclic = 0,
.done = &done,
}; };
WARN_ON(!rwsem_is_locked(&sb->s_umount)); WARN_ON(!rwsem_is_locked(&sb->s_umount));
bdi_queue_work_onstack(&args); bdi_queue_work(sb->s_bdi, &work);
wait_for_completion(&done);
wait_sb_inodes(sb); wait_sb_inodes(sb);
} }
EXPORT_SYMBOL(sync_inodes_sb); EXPORT_SYMBOL(sync_inodes_sb);
......
...@@ -82,8 +82,6 @@ struct backing_dev_info { ...@@ -82,8 +82,6 @@ struct backing_dev_info {
struct bdi_writeback wb; /* default writeback info for this bdi */ struct bdi_writeback wb; /* default writeback info for this bdi */
spinlock_t wb_lock; /* protects update side of wb_list */ spinlock_t wb_lock; /* protects update side of wb_list */
struct list_head wb_list; /* the flusher threads hanging off this bdi */ struct list_head wb_list; /* the flusher threads hanging off this bdi */
unsigned long wb_mask; /* bitmask of registered tasks */
unsigned int wb_cnt; /* number of registered tasks */
struct list_head work_list; struct list_head work_list;
......
...@@ -104,15 +104,13 @@ static int bdi_debug_stats_show(struct seq_file *m, void *v) ...@@ -104,15 +104,13 @@ static int bdi_debug_stats_show(struct seq_file *m, void *v)
"b_more_io: %8lu\n" "b_more_io: %8lu\n"
"bdi_list: %8u\n" "bdi_list: %8u\n"
"state: %8lx\n" "state: %8lx\n"
"wb_mask: %8lx\n" "wb_list: %8u\n",
"wb_list: %8u\n"
"wb_cnt: %8u\n",
(unsigned long) K(bdi_stat(bdi, BDI_WRITEBACK)), (unsigned long) K(bdi_stat(bdi, BDI_WRITEBACK)),
(unsigned long) K(bdi_stat(bdi, BDI_RECLAIMABLE)), (unsigned long) K(bdi_stat(bdi, BDI_RECLAIMABLE)),
K(bdi_thresh), K(dirty_thresh), K(bdi_thresh), K(dirty_thresh),
K(background_thresh), nr_wb, nr_dirty, nr_io, nr_more_io, K(background_thresh), nr_wb, nr_dirty, nr_io, nr_more_io,
!list_empty(&bdi->bdi_list), bdi->state, bdi->wb_mask, !list_empty(&bdi->bdi_list), bdi->state,
!list_empty(&bdi->wb_list), bdi->wb_cnt); !list_empty(&bdi->wb_list));
#undef K #undef K
return 0; return 0;
...@@ -674,12 +672,6 @@ int bdi_init(struct backing_dev_info *bdi) ...@@ -674,12 +672,6 @@ int bdi_init(struct backing_dev_info *bdi)
bdi_wb_init(&bdi->wb, bdi); bdi_wb_init(&bdi->wb, bdi);
/*
* Just one thread support for now, hard code mask and count
*/
bdi->wb_mask = 1;
bdi->wb_cnt = 1;
for (i = 0; i < NR_BDI_STAT_ITEMS; i++) { for (i = 0; i < NR_BDI_STAT_ITEMS; i++) {
err = percpu_counter_init(&bdi->bdi_stat[i], 0); err = percpu_counter_init(&bdi->bdi_stat[i], 0);
if (err) if (err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment