Commit 502ca9d8 authored by Tejun Heo's avatar Tejun Heo

workqueue: make single thread workqueue shared worker pool friendly

Reimplement st (single thread) workqueue so that it's friendly to
shared worker pool.  It was originally implemented by confining st
workqueues to use cwq of a fixed cpu and always having a worker for
the cpu.  This implementation isn't very friendly to shared worker
pool and suboptimal in that it ends up crossing cpu boundaries often.

Reimplement st workqueue using dynamic single cpu binding and
cwq->limit.  WQ_SINGLE_THREAD is replaced with WQ_SINGLE_CPU.  In a
single cpu workqueue, at most single cwq is bound to the wq at any
given time.  Arbitration is done using atomic accesses to
wq->single_cpu when queueing a work.  Once bound, the binding stays
till the workqueue is drained.

Note that the binding is never broken while a workqueue is frozen.
This is because idle cwqs may have works waiting in delayed_works
queue while frozen.  On thaw, the cwq is restarted if there are any
delayed works or unbound otherwise.

When combined with max_active limit of 1, single cpu workqueue has
exactly the same execution properties as the original single thread
workqueue while allowing sharing of per-cpu workers.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
parent db7bccf4
...@@ -221,7 +221,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; } ...@@ -221,7 +221,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
enum { enum {
WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */ WQ_FREEZEABLE = 1 << 0, /* freeze during suspend */
WQ_SINGLE_THREAD = 1 << 1, /* no per-cpu worker */ WQ_SINGLE_CPU = 1 << 1, /* only single cpu at a time */
}; };
extern struct workqueue_struct * extern struct workqueue_struct *
...@@ -250,9 +250,9 @@ __create_workqueue_key(const char *name, unsigned int flags, int max_active, ...@@ -250,9 +250,9 @@ __create_workqueue_key(const char *name, unsigned int flags, int max_active,
#define create_workqueue(name) \ #define create_workqueue(name) \
__create_workqueue((name), 0, 1) __create_workqueue((name), 0, 1)
#define create_freezeable_workqueue(name) \ #define create_freezeable_workqueue(name) \
__create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_THREAD, 1) __create_workqueue((name), WQ_FREEZEABLE | WQ_SINGLE_CPU, 1)
#define create_singlethread_workqueue(name) \ #define create_singlethread_workqueue(name) \
__create_workqueue((name), WQ_SINGLE_THREAD, 1) __create_workqueue((name), WQ_SINGLE_CPU, 1)
extern void destroy_workqueue(struct workqueue_struct *wq); extern void destroy_workqueue(struct workqueue_struct *wq);
......
...@@ -114,8 +114,7 @@ struct global_cwq { ...@@ -114,8 +114,7 @@ struct global_cwq {
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
/* /*
* The per-CPU workqueue (if single thread, we always use the first * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of
* possible cpu). The lower WORK_STRUCT_FLAG_BITS of
* work_struct->data are used for flags and thus cwqs need to be * work_struct->data are used for flags and thus cwqs need to be
* aligned at two's power of the number of flag bits. * aligned at two's power of the number of flag bits.
*/ */
...@@ -159,6 +158,8 @@ struct workqueue_struct { ...@@ -159,6 +158,8 @@ struct workqueue_struct {
struct list_head flusher_queue; /* F: flush waiters */ struct list_head flusher_queue; /* F: flush waiters */
struct list_head flusher_overflow; /* F: flush overflow list */ struct list_head flusher_overflow; /* F: flush overflow list */
unsigned long single_cpu; /* cpu for single cpu wq */
int saved_max_active; /* I: saved cwq max_active */ int saved_max_active; /* I: saved cwq max_active */
const char *name; /* I: workqueue name */ const char *name; /* I: workqueue name */
#ifdef CONFIG_LOCKDEP #ifdef CONFIG_LOCKDEP
...@@ -289,8 +290,6 @@ static DEFINE_PER_CPU(struct global_cwq, global_cwq); ...@@ -289,8 +290,6 @@ static DEFINE_PER_CPU(struct global_cwq, global_cwq);
static int worker_thread(void *__worker); static int worker_thread(void *__worker);
static int singlethread_cpu __read_mostly;
static struct global_cwq *get_gcwq(unsigned int cpu) static struct global_cwq *get_gcwq(unsigned int cpu)
{ {
return &per_cpu(global_cwq, cpu); return &per_cpu(global_cwq, cpu);
...@@ -302,14 +301,6 @@ static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, ...@@ -302,14 +301,6 @@ static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
return per_cpu_ptr(wq->cpu_wq, cpu); return per_cpu_ptr(wq->cpu_wq, cpu);
} }
static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
struct workqueue_struct *wq)
{
if (unlikely(wq->flags & WQ_SINGLE_THREAD))
cpu = singlethread_cpu;
return get_cwq(cpu, wq);
}
static unsigned int work_color_to_flags(int color) static unsigned int work_color_to_flags(int color)
{ {
return color << WORK_STRUCT_COLOR_SHIFT; return color << WORK_STRUCT_COLOR_SHIFT;
...@@ -410,17 +401,87 @@ static void insert_work(struct cpu_workqueue_struct *cwq, ...@@ -410,17 +401,87 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
wake_up_process(cwq->worker->task); wake_up_process(cwq->worker->task);
} }
/**
* cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
* @cwq: cwq to unbind
*
* Try to unbind @cwq from single cpu workqueue processing. If
* @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
*
* CONTEXT:
* spin_lock_irq(gcwq->lock).
*/
static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
{
struct workqueue_struct *wq = cwq->wq;
struct global_cwq *gcwq = cwq->gcwq;
BUG_ON(wq->single_cpu != gcwq->cpu);
/*
* Unbind from workqueue if @cwq is not frozen. If frozen,
* thaw_workqueues() will either restart processing on this
* cpu or unbind if empty. This keeps works queued while
* frozen fully ordered and flushable.
*/
if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
smp_wmb(); /* paired with cmpxchg() in __queue_work() */
wq->single_cpu = NR_CPUS;
}
}
static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work) struct work_struct *work)
{ {
struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq); struct global_cwq *gcwq;
struct global_cwq *gcwq = cwq->gcwq; struct cpu_workqueue_struct *cwq;
struct list_head *worklist; struct list_head *worklist;
unsigned long flags; unsigned long flags;
bool arbitrate;
debug_work_activate(work); debug_work_activate(work);
spin_lock_irqsave(&gcwq->lock, flags); /* determine gcwq to use */
if (!(wq->flags & WQ_SINGLE_CPU)) {
/* just use the requested cpu for multicpu workqueues */
gcwq = get_gcwq(cpu);
spin_lock_irqsave(&gcwq->lock, flags);
} else {
unsigned int req_cpu = cpu;
/*
* It's a bit more complex for single cpu workqueues.
* We first need to determine which cpu is going to be
* used. If no cpu is currently serving this
* workqueue, arbitrate using atomic accesses to
* wq->single_cpu; otherwise, use the current one.
*/
retry:
cpu = wq->single_cpu;
arbitrate = cpu == NR_CPUS;
if (arbitrate)
cpu = req_cpu;
gcwq = get_gcwq(cpu);
spin_lock_irqsave(&gcwq->lock, flags);
/*
* The following cmpxchg() is a full barrier paired
* with smp_wmb() in cwq_unbind_single_cpu() and
* guarantees that all changes to wq->st_* fields are
* visible on the new cpu after this point.
*/
if (arbitrate)
cmpxchg(&wq->single_cpu, NR_CPUS, cpu);
if (unlikely(wq->single_cpu != cpu)) {
spin_unlock_irqrestore(&gcwq->lock, flags);
goto retry;
}
}
/* gcwq determined, get cwq and queue */
cwq = get_cwq(gcwq->cpu, wq);
BUG_ON(!list_empty(&work->entry)); BUG_ON(!list_empty(&work->entry));
cwq->nr_in_flight[cwq->work_color]++; cwq->nr_in_flight[cwq->work_color]++;
...@@ -530,7 +591,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, ...@@ -530,7 +591,7 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
timer_stats_timer_set_start_info(&dwork->timer); timer_stats_timer_set_start_info(&dwork->timer);
/* This stores cwq for the moment, for the timer_fn */ /* This stores cwq for the moment, for the timer_fn */
set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0); set_wq_data(work, get_cwq(raw_smp_processor_id(), wq), 0);
timer->expires = jiffies + delay; timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork; timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn; timer->function = delayed_work_timer_fn;
...@@ -790,10 +851,14 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color) ...@@ -790,10 +851,14 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
cwq->nr_in_flight[color]--; cwq->nr_in_flight[color]--;
cwq->nr_active--; cwq->nr_active--;
/* one down, submit a delayed one */ if (!list_empty(&cwq->delayed_works)) {
if (!list_empty(&cwq->delayed_works) && /* one down, submit a delayed one */
cwq->nr_active < cwq->max_active) if (cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq); cwq_activate_first_delayed(cwq);
} else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
/* this was the last work, unbind from single cpu */
cwq_unbind_single_cpu(cwq);
}
/* is flush in progress and are we at the flushing tip? */ /* is flush in progress and are we at the flushing tip? */
if (likely(cwq->flush_color != color)) if (likely(cwq->flush_color != color))
...@@ -1727,7 +1792,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name, ...@@ -1727,7 +1792,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
struct lock_class_key *key, struct lock_class_key *key,
const char *lock_name) const char *lock_name)
{ {
bool singlethread = flags & WQ_SINGLE_THREAD;
struct workqueue_struct *wq; struct workqueue_struct *wq;
bool failed = false; bool failed = false;
unsigned int cpu; unsigned int cpu;
...@@ -1748,6 +1812,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name, ...@@ -1748,6 +1812,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
atomic_set(&wq->nr_cwqs_to_flush, 0); atomic_set(&wq->nr_cwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->flusher_queue); INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow); INIT_LIST_HEAD(&wq->flusher_overflow);
wq->single_cpu = NR_CPUS;
wq->name = name; wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list); INIT_LIST_HEAD(&wq->list);
...@@ -1773,8 +1839,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name, ...@@ -1773,8 +1839,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
if (failed) if (failed)
continue; continue;
cwq->worker = create_worker(cwq, cwq->worker = create_worker(cwq, cpu_online(cpu));
cpu_online(cpu) && !singlethread);
if (cwq->worker) if (cwq->worker)
start_worker(cwq->worker); start_worker(cwq->worker);
else else
...@@ -1958,18 +2023,16 @@ static int __cpuinit trustee_thread(void *__gcwq) ...@@ -1958,18 +2023,16 @@ static int __cpuinit trustee_thread(void *__gcwq)
spin_lock_irq(&gcwq->lock); spin_lock_irq(&gcwq->lock);
/* /*
* Make all multithread workers rogue. Trustee must be bound * Make all workers rogue. Trustee must be bound to the
* to the target cpu and can't be cancelled. * target cpu and can't be cancelled.
*/ */
BUG_ON(gcwq->cpu != smp_processor_id()); BUG_ON(gcwq->cpu != smp_processor_id());
list_for_each_entry(worker, &gcwq->idle_list, entry) list_for_each_entry(worker, &gcwq->idle_list, entry)
if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) worker->flags |= WORKER_ROGUE;
worker->flags |= WORKER_ROGUE;
for_each_busy_worker(worker, i, pos, gcwq) for_each_busy_worker(worker, i, pos, gcwq)
if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) worker->flags |= WORKER_ROGUE;
worker->flags |= WORKER_ROGUE;
/* /*
* We're now in charge. Notify and proceed to drain. We need * We're now in charge. Notify and proceed to drain. We need
...@@ -2074,14 +2137,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, ...@@ -2074,14 +2137,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
wait_trustee_state(gcwq, TRUSTEE_DONE); wait_trustee_state(gcwq, TRUSTEE_DONE);
} }
/* clear ROGUE from all multithread workers */ /* clear ROGUE from all workers */
list_for_each_entry(worker, &gcwq->idle_list, entry) list_for_each_entry(worker, &gcwq->idle_list, entry)
if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) worker->flags &= ~WORKER_ROGUE;
worker->flags &= ~WORKER_ROGUE;
for_each_busy_worker(worker, i, pos, gcwq) for_each_busy_worker(worker, i, pos, gcwq)
if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD)) worker->flags &= ~WORKER_ROGUE;
worker->flags &= ~WORKER_ROGUE;
break; break;
} }
...@@ -2266,6 +2327,11 @@ void thaw_workqueues(void) ...@@ -2266,6 +2327,11 @@ void thaw_workqueues(void)
cwq->nr_active < cwq->max_active) cwq->nr_active < cwq->max_active)
cwq_activate_first_delayed(cwq); cwq_activate_first_delayed(cwq);
/* perform delayed unbind from single cpu if empty */
if (wq->single_cpu == gcwq->cpu &&
!cwq->nr_active && list_empty(&cwq->delayed_works))
cwq_unbind_single_cpu(cwq);
wake_up_process(cwq->worker->task); wake_up_process(cwq->worker->task);
} }
...@@ -2283,7 +2349,6 @@ void __init init_workqueues(void) ...@@ -2283,7 +2349,6 @@ void __init init_workqueues(void)
unsigned int cpu; unsigned int cpu;
int i; int i;
singlethread_cpu = cpumask_first(cpu_possible_mask);
hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE); hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
/* initialize gcwqs */ /* initialize gcwqs */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment