Commit fc2e4d70 authored by Oleg Nesterov's avatar Oleg Nesterov Committed by Linus Torvalds

reimplement flush_workqueue()

Remove ->remove_sequence, ->insert_sequence, and ->work_done from struct
cpu_workqueue_struct.  To implement flush_workqueue() we can queue a
barrier work on each CPU and wait for its completition.

The barrier is queued under workqueue_mutex to ensure that per cpu
wq->cpu_wq is alive, we drop this mutex before going to sleep.  If CPU goes
down while we are waiting for completition, take_over_work() will move the
barrier on another CPU, and the handler will wake up us eventually.
Signed-off-by: default avatarOleg Nesterov <oleg@tv-sign.ru>
Signed-off-by: default avatarAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@linux-foundation.org>
parent e18f3ffb
...@@ -36,23 +36,13 @@ ...@@ -36,23 +36,13 @@
/* /*
* The per-CPU workqueue (if single thread, we always use the first * The per-CPU workqueue (if single thread, we always use the first
* possible cpu). * possible cpu).
*
* The sequence counters are for flush_scheduled_work(). It wants to wait
* until all currently-scheduled works are completed, but it doesn't
* want to be livelocked by new, incoming ones. So it waits until
* remove_sequence is >= the insert_sequence which pertained when
* flush_scheduled_work() was called.
*/ */
struct cpu_workqueue_struct { struct cpu_workqueue_struct {
spinlock_t lock; spinlock_t lock;
long remove_sequence; /* Least-recently added (next to run) */
long insert_sequence; /* Next to add */
struct list_head worklist; struct list_head worklist;
wait_queue_head_t more_work; wait_queue_head_t more_work;
wait_queue_head_t work_done;
struct workqueue_struct *wq; struct workqueue_struct *wq;
struct task_struct *thread; struct task_struct *thread;
...@@ -138,8 +128,6 @@ static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work ...@@ -138,8 +128,6 @@ static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work
f(work); f(work);
spin_lock_irqsave(&cwq->lock, flags); spin_lock_irqsave(&cwq->lock, flags);
cwq->remove_sequence++;
wake_up(&cwq->work_done);
ret = 1; ret = 1;
} }
spin_unlock_irqrestore(&cwq->lock, flags); spin_unlock_irqrestore(&cwq->lock, flags);
...@@ -187,7 +175,6 @@ static void __queue_work(struct cpu_workqueue_struct *cwq, ...@@ -187,7 +175,6 @@ static void __queue_work(struct cpu_workqueue_struct *cwq,
spin_lock_irqsave(&cwq->lock, flags); spin_lock_irqsave(&cwq->lock, flags);
set_wq_data(work, cwq); set_wq_data(work, cwq);
list_add_tail(&work->entry, &cwq->worklist); list_add_tail(&work->entry, &cwq->worklist);
cwq->insert_sequence++;
wake_up(&cwq->more_work); wake_up(&cwq->more_work);
spin_unlock_irqrestore(&cwq->lock, flags); spin_unlock_irqrestore(&cwq->lock, flags);
} }
...@@ -338,8 +325,6 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -338,8 +325,6 @@ static void run_workqueue(struct cpu_workqueue_struct *cwq)
} }
spin_lock_irqsave(&cwq->lock, flags); spin_lock_irqsave(&cwq->lock, flags);
cwq->remove_sequence++;
wake_up(&cwq->work_done);
} }
cwq->run_depth--; cwq->run_depth--;
spin_unlock_irqrestore(&cwq->lock, flags); spin_unlock_irqrestore(&cwq->lock, flags);
...@@ -394,6 +379,25 @@ static int worker_thread(void *__cwq) ...@@ -394,6 +379,25 @@ static int worker_thread(void *__cwq)
return 0; return 0;
} }
struct wq_barrier {
struct work_struct work;
struct completion done;
};
static void wq_barrier_func(struct work_struct *work)
{
struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
complete(&barr->done);
}
static inline void init_wq_barrier(struct wq_barrier *barr)
{
INIT_WORK(&barr->work, wq_barrier_func);
__set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
init_completion(&barr->done);
}
static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{ {
if (cwq->thread == current) { if (cwq->thread == current) {
...@@ -401,23 +405,18 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -401,23 +405,18 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
* Probably keventd trying to flush its own queue. So simply run * Probably keventd trying to flush its own queue. So simply run
* it by hand rather than deadlocking. * it by hand rather than deadlocking.
*/ */
mutex_unlock(&workqueue_mutex);
run_workqueue(cwq); run_workqueue(cwq);
mutex_lock(&workqueue_mutex);
} else { } else {
DEFINE_WAIT(wait); struct wq_barrier barr;
long sequence_needed;
spin_lock_irq(&cwq->lock); init_wq_barrier(&barr);
sequence_needed = cwq->insert_sequence; __queue_work(cwq, &barr.work);
while (sequence_needed - cwq->remove_sequence > 0) { mutex_unlock(&workqueue_mutex);
prepare_to_wait(&cwq->work_done, &wait, wait_for_completion(&barr.done);
TASK_UNINTERRUPTIBLE); mutex_lock(&workqueue_mutex);
spin_unlock_irq(&cwq->lock);
schedule();
spin_lock_irq(&cwq->lock);
}
finish_wait(&cwq->work_done, &wait);
spin_unlock_irq(&cwq->lock);
} }
} }
...@@ -428,29 +427,25 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -428,29 +427,25 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
* Forces execution of the workqueue and blocks until its completion. * Forces execution of the workqueue and blocks until its completion.
* This is typically used in driver shutdown handlers. * This is typically used in driver shutdown handlers.
* *
* This function will sample each workqueue's current insert_sequence number and * We sleep until all works which were queued on entry have been handled,
* will sleep until the head sequence is greater than or equal to that. This * but we are not livelocked by new incoming ones.
* means that we sleep until all works which were queued on entry have been
* handled, but we are not livelocked by new incoming ones.
* *
* This function used to run the workqueues itself. Now we just wait for the * This function used to run the workqueues itself. Now we just wait for the
* helper threads to do it. * helper threads to do it.
*/ */
void fastcall flush_workqueue(struct workqueue_struct *wq) void fastcall flush_workqueue(struct workqueue_struct *wq)
{ {
might_sleep(); mutex_lock(&workqueue_mutex);
if (is_single_threaded(wq)) { if (is_single_threaded(wq)) {
/* Always use first cpu's area. */ /* Always use first cpu's area. */
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
} else { } else {
int cpu; int cpu;
mutex_lock(&workqueue_mutex);
for_each_online_cpu(cpu) for_each_online_cpu(cpu)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
mutex_unlock(&workqueue_mutex);
} }
mutex_unlock(&workqueue_mutex);
} }
EXPORT_SYMBOL_GPL(flush_workqueue); EXPORT_SYMBOL_GPL(flush_workqueue);
...@@ -463,12 +458,9 @@ static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, ...@@ -463,12 +458,9 @@ static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
spin_lock_init(&cwq->lock); spin_lock_init(&cwq->lock);
cwq->wq = wq; cwq->wq = wq;
cwq->thread = NULL; cwq->thread = NULL;
cwq->insert_sequence = 0;
cwq->remove_sequence = 0;
cwq->freezeable = freezeable; cwq->freezeable = freezeable;
INIT_LIST_HEAD(&cwq->worklist); INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done);
if (is_single_threaded(wq)) if (is_single_threaded(wq))
p = kthread_create(worker_thread, cwq, "%s", wq->name); p = kthread_create(worker_thread, cwq, "%s", wq->name);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment