Commit 7e11629d authored by Tejun Heo's avatar Tejun Heo

workqueue: use shared worklist and pool all workers per cpu

Use gcwq->worklist instead of cwq->worklist and break the strict
association between a cwq and its worker.  All works queued on a cpu
are queued on gcwq->worklist and processed by any available worker on
the gcwq.

As there no longer is strict association between a cwq and its worker,
whether a work is executing can now only be determined by calling
[__]find_worker_executing_work().

After this change, the only association between a cwq and its worker
is that a cwq puts a worker into shared worker pool on creation and
kills it on destruction.  As all workqueues are still limited to
max_active of one, this means that there are always at least as many
workers as active works and thus there's no danger for deadlock.

The break of strong association between cwqs and workers requires
somewhat clumsy changes to current_is_keventd() and
destroy_workqueue().  Dynamic worker pool management will remove both
clumsy changes.  current_is_keventd() won't be necessary at all as the
only reason it exists is to avoid queueing a work from a work which
will be allowed just fine.  The clumsy part of destroy_workqueue() is
added because a worker can only be destroyed while idle and there's no
guarantee a worker is idle when its wq is going down.  With dynamic
pool management, workers are not associated with workqueues at all and
only idle ones will be submitted to destroy_workqueue() so the code
won't be necessary anymore.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
parent 18aa9eff
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <linux/debug_locks.h> #include <linux/debug_locks.h>
#include <linux/lockdep.h> #include <linux/lockdep.h>
#include <linux/idr.h> #include <linux/idr.h>
#include <linux/delay.h>
enum { enum {
/* global_cwq flags */ /* global_cwq flags */
...@@ -72,7 +73,6 @@ enum { ...@@ -72,7 +73,6 @@ enum {
*/ */
struct global_cwq; struct global_cwq;
struct cpu_workqueue_struct;
struct worker { struct worker {
/* on idle list while idle, on busy hash table while busy */ /* on idle list while idle, on busy hash table while busy */
...@@ -86,7 +86,6 @@ struct worker { ...@@ -86,7 +86,6 @@ struct worker {
struct list_head scheduled; /* L: scheduled works */ struct list_head scheduled; /* L: scheduled works */
struct task_struct *task; /* I: worker task */ struct task_struct *task; /* I: worker task */
struct global_cwq *gcwq; /* I: the associated gcwq */ struct global_cwq *gcwq; /* I: the associated gcwq */
struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
unsigned int flags; /* L: flags */ unsigned int flags; /* L: flags */
int id; /* I: worker id */ int id; /* I: worker id */
}; };
...@@ -96,6 +95,7 @@ struct worker { ...@@ -96,6 +95,7 @@ struct worker {
*/ */
struct global_cwq { struct global_cwq {
spinlock_t lock; /* the gcwq lock */ spinlock_t lock; /* the gcwq lock */
struct list_head worklist; /* L: list of pending works */
unsigned int cpu; /* I: the associated cpu */ unsigned int cpu; /* I: the associated cpu */
unsigned int flags; /* L: GCWQ_* flags */ unsigned int flags; /* L: GCWQ_* flags */
...@@ -121,7 +121,6 @@ struct global_cwq { ...@@ -121,7 +121,6 @@ struct global_cwq {
*/ */
struct cpu_workqueue_struct { struct cpu_workqueue_struct {
struct global_cwq *gcwq; /* I: the associated gcwq */ struct global_cwq *gcwq; /* I: the associated gcwq */
struct list_head worklist;
struct worker *worker; struct worker *worker;
struct workqueue_struct *wq; /* I: the owning workqueue */ struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */ int work_color; /* L: current color */
...@@ -386,6 +385,32 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work) ...@@ -386,6 +385,32 @@ static struct global_cwq *get_work_gcwq(struct work_struct *work)
return get_gcwq(cpu); return get_gcwq(cpu);
} }
/* Return the first worker. Safe with preemption disabled */
static struct worker *first_worker(struct global_cwq *gcwq)
{
if (unlikely(list_empty(&gcwq->idle_list)))
return NULL;
return list_first_entry(&gcwq->idle_list, struct worker, entry);
}
/**
* wake_up_worker - wake up an idle worker
* @gcwq: gcwq to wake worker for
*
* Wake up the first idle worker of @gcwq.
*
* CONTEXT:
* spin_lock_irq(gcwq->lock).
*/
static void wake_up_worker(struct global_cwq *gcwq)
{
struct worker *worker = first_worker(gcwq);
if (likely(worker))
wake_up_process(worker->task);
}
/** /**
* busy_worker_head - return the busy hash head for a work * busy_worker_head - return the busy hash head for a work
* @gcwq: gcwq of interest * @gcwq: gcwq of interest
...@@ -467,13 +492,14 @@ static struct worker *find_worker_executing_work(struct global_cwq *gcwq, ...@@ -467,13 +492,14 @@ static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
} }
/** /**
* insert_work - insert a work into cwq * insert_work - insert a work into gcwq
* @cwq: cwq @work belongs to * @cwq: cwq @work belongs to
* @work: work to insert * @work: work to insert
* @head: insertion point * @head: insertion point
* @extra_flags: extra WORK_STRUCT_* flags to set * @extra_flags: extra WORK_STRUCT_* flags to set
* *
* Insert @work into @cwq after @head. * Insert @work which belongs to @cwq into @gcwq after @head.
* @extra_flags is or'd to work_struct flags.
* *
* CONTEXT: * CONTEXT:
* spin_lock_irq(gcwq->lock). * spin_lock_irq(gcwq->lock).
...@@ -492,7 +518,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq, ...@@ -492,7 +518,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
smp_wmb(); smp_wmb();
list_add_tail(&work->entry, head); list_add_tail(&work->entry, head);
wake_up_process(cwq->worker->task); wake_up_worker(cwq->gcwq);
} }
/** /**
...@@ -608,7 +634,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, ...@@ -608,7 +634,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
if (likely(cwq->nr_active < cwq->max_active)) { if (likely(cwq->nr_active < cwq->max_active)) {
cwq->nr_active++; cwq->nr_active++;
worklist = &cwq->worklist; worklist = &gcwq->worklist;
} else } else
worklist = &cwq->delayed_works; worklist = &cwq->delayed_works;
...@@ -793,10 +819,10 @@ static struct worker *alloc_worker(void) ...@@ -793,10 +819,10 @@ static struct worker *alloc_worker(void)
/** /**
* create_worker - create a new workqueue worker * create_worker - create a new workqueue worker
* @cwq: cwq the new worker will belong to * @gcwq: gcwq the new worker will belong to
* @bind: whether to set affinity to @cpu or not * @bind: whether to set affinity to @cpu or not
* *
* Create a new worker which is bound to @cwq. The returned worker * Create a new worker which is bound to @gcwq. The returned worker
* can be started by calling start_worker() or destroyed using * can be started by calling start_worker() or destroyed using
* destroy_worker(). * destroy_worker().
* *
...@@ -806,9 +832,8 @@ static struct worker *alloc_worker(void) ...@@ -806,9 +832,8 @@ static struct worker *alloc_worker(void)
* RETURNS: * RETURNS:
* Pointer to the newly created worker. * Pointer to the newly created worker.
*/ */
static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind) static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
{ {
struct global_cwq *gcwq = cwq->gcwq;
int id = -1; int id = -1;
struct worker *worker = NULL; struct worker *worker = NULL;
...@@ -826,7 +851,6 @@ static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind) ...@@ -826,7 +851,6 @@ static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
goto fail; goto fail;
worker->gcwq = gcwq; worker->gcwq = gcwq;
worker->cwq = cwq;
worker->id = id; worker->id = id;
worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d", worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
...@@ -953,7 +977,7 @@ static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq) ...@@ -953,7 +977,7 @@ static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
struct work_struct *work = list_first_entry(&cwq->delayed_works, struct work_struct *work = list_first_entry(&cwq->delayed_works,
struct work_struct, entry); struct work_struct, entry);
move_linked_works(work, &cwq->worklist, NULL); move_linked_works(work, &cwq->gcwq->worklist, NULL);
cwq->nr_active++; cwq->nr_active++;
} }
...@@ -1021,11 +1045,12 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color) ...@@ -1021,11 +1045,12 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
*/ */
static void process_one_work(struct worker *worker, struct work_struct *work) static void process_one_work(struct worker *worker, struct work_struct *work)
{ {
struct cpu_workqueue_struct *cwq = worker->cwq; struct cpu_workqueue_struct *cwq = get_work_cwq(work);
struct global_cwq *gcwq = cwq->gcwq; struct global_cwq *gcwq = cwq->gcwq;
struct hlist_head *bwh = busy_worker_head(gcwq, work); struct hlist_head *bwh = busy_worker_head(gcwq, work);
work_func_t f = work->func; work_func_t f = work->func;
int work_color; int work_color;
struct worker *collision;
#ifdef CONFIG_LOCKDEP #ifdef CONFIG_LOCKDEP
/* /*
* It is permissible to free the struct work_struct from * It is permissible to free the struct work_struct from
...@@ -1036,6 +1061,18 @@ static void process_one_work(struct worker *worker, struct work_struct *work) ...@@ -1036,6 +1061,18 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
*/ */
struct lockdep_map lockdep_map = work->lockdep_map; struct lockdep_map lockdep_map = work->lockdep_map;
#endif #endif
/*
* A single work shouldn't be executed concurrently by
* multiple workers on a single cpu. Check whether anyone is
* already processing the work. If so, defer the work to the
* currently executing one.
*/
collision = __find_worker_executing_work(gcwq, bwh, work);
if (unlikely(collision)) {
move_linked_works(work, &collision->scheduled, NULL);
return;
}
/* claim and process */ /* claim and process */
debug_work_deactivate(work); debug_work_deactivate(work);
hlist_add_head(&worker->hentry, bwh); hlist_add_head(&worker->hentry, bwh);
...@@ -1043,7 +1080,6 @@ static void process_one_work(struct worker *worker, struct work_struct *work) ...@@ -1043,7 +1080,6 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
worker->current_cwq = cwq; worker->current_cwq = cwq;
work_color = get_work_color(work); work_color = get_work_color(work);
BUG_ON(get_work_cwq(work) != cwq);
/* record the current cpu number in the work data and dequeue */ /* record the current cpu number in the work data and dequeue */
set_work_cpu(work, gcwq->cpu); set_work_cpu(work, gcwq->cpu);
list_del_init(&work->entry); list_del_init(&work->entry);
...@@ -1107,7 +1143,6 @@ static int worker_thread(void *__worker) ...@@ -1107,7 +1143,6 @@ static int worker_thread(void *__worker)
{ {
struct worker *worker = __worker; struct worker *worker = __worker;
struct global_cwq *gcwq = worker->gcwq; struct global_cwq *gcwq = worker->gcwq;
struct cpu_workqueue_struct *cwq = worker->cwq;
woke_up: woke_up:
spin_lock_irq(&gcwq->lock); spin_lock_irq(&gcwq->lock);
...@@ -1127,9 +1162,9 @@ static int worker_thread(void *__worker) ...@@ -1127,9 +1162,9 @@ static int worker_thread(void *__worker)
*/ */
BUG_ON(!list_empty(&worker->scheduled)); BUG_ON(!list_empty(&worker->scheduled));
while (!list_empty(&cwq->worklist)) { while (!list_empty(&gcwq->worklist)) {
struct work_struct *work = struct work_struct *work =
list_first_entry(&cwq->worklist, list_first_entry(&gcwq->worklist,
struct work_struct, entry); struct work_struct, entry);
/* /*
...@@ -1844,18 +1879,37 @@ int keventd_up(void) ...@@ -1844,18 +1879,37 @@ int keventd_up(void)
int current_is_keventd(void) int current_is_keventd(void)
{ {
struct cpu_workqueue_struct *cwq; bool found = false;
int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */ unsigned int cpu;
int ret = 0;
BUG_ON(!keventd_wq); /*
* There no longer is one-to-one relation between worker and
* work queue and a worker task might be unbound from its cpu
* if the cpu was offlined. Match all busy workers. This
* function will go away once dynamic pool is implemented.
*/
for_each_possible_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
struct worker *worker;
struct hlist_node *pos;
unsigned long flags;
int i;
cwq = get_cwq(cpu, keventd_wq); spin_lock_irqsave(&gcwq->lock, flags);
if (current == cwq->worker->task)
ret = 1;
return ret; for_each_busy_worker(worker, i, pos, gcwq) {
if (worker->task == current) {
found = true;
break;
}
}
spin_unlock_irqrestore(&gcwq->lock, flags);
if (found)
break;
}
return found;
} }
static struct cpu_workqueue_struct *alloc_cwqs(void) static struct cpu_workqueue_struct *alloc_cwqs(void)
...@@ -1953,12 +2007,11 @@ struct workqueue_struct *__create_workqueue_key(const char *name, ...@@ -1953,12 +2007,11 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
cwq->wq = wq; cwq->wq = wq;
cwq->flush_color = -1; cwq->flush_color = -1;
cwq->max_active = max_active; cwq->max_active = max_active;
INIT_LIST_HEAD(&cwq->worklist);
INIT_LIST_HEAD(&cwq->delayed_works); INIT_LIST_HEAD(&cwq->delayed_works);
if (failed) if (failed)
continue; continue;
cwq->worker = create_worker(cwq, cpu_online(cpu)); cwq->worker = create_worker(gcwq, cpu_online(cpu));
if (cwq->worker) if (cwq->worker)
start_worker(cwq->worker); start_worker(cwq->worker);
else else
...@@ -2020,13 +2073,26 @@ void destroy_workqueue(struct workqueue_struct *wq) ...@@ -2020,13 +2073,26 @@ void destroy_workqueue(struct workqueue_struct *wq)
for_each_possible_cpu(cpu) { for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
struct global_cwq *gcwq = cwq->gcwq;
int i; int i;
if (cwq->worker) { if (cwq->worker) {
spin_lock_irq(&cwq->gcwq->lock); retry:
spin_lock_irq(&gcwq->lock);
/*
* Worker can only be destroyed while idle.
* Wait till it becomes idle. This is ugly
* and prone to starvation. It will go away
* once dynamic worker pool is implemented.
*/
if (!(cwq->worker->flags & WORKER_IDLE)) {
spin_unlock_irq(&gcwq->lock);
msleep(100);
goto retry;
}
destroy_worker(cwq->worker); destroy_worker(cwq->worker);
cwq->worker = NULL; cwq->worker = NULL;
spin_unlock_irq(&cwq->gcwq->lock); spin_unlock_irq(&gcwq->lock);
} }
for (i = 0; i < WORK_NR_COLORS; i++) for (i = 0; i < WORK_NR_COLORS; i++)
...@@ -2324,7 +2390,7 @@ EXPORT_SYMBOL_GPL(work_on_cpu); ...@@ -2324,7 +2390,7 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
* *
* Start freezing workqueues. After this function returns, all * Start freezing workqueues. After this function returns, all
* freezeable workqueues will queue new works to their frozen_works * freezeable workqueues will queue new works to their frozen_works
* list instead of the cwq ones. * list instead of gcwq->worklist.
* *
* CONTEXT: * CONTEXT:
* Grabs and releases workqueue_lock and gcwq->lock's. * Grabs and releases workqueue_lock and gcwq->lock's.
...@@ -2410,7 +2476,7 @@ bool freeze_workqueues_busy(void) ...@@ -2410,7 +2476,7 @@ bool freeze_workqueues_busy(void)
* thaw_workqueues - thaw workqueues * thaw_workqueues - thaw workqueues
* *
* Thaw workqueues. Normal queueing is restored and all collected * Thaw workqueues. Normal queueing is restored and all collected
* frozen works are transferred to their respective cwq worklists. * frozen works are transferred to their respective gcwq worklists.
* *
* CONTEXT: * CONTEXT:
* Grabs and releases workqueue_lock and gcwq->lock's. * Grabs and releases workqueue_lock and gcwq->lock's.
...@@ -2483,6 +2549,7 @@ void __init init_workqueues(void) ...@@ -2483,6 +2549,7 @@ void __init init_workqueues(void)
struct global_cwq *gcwq = get_gcwq(cpu); struct global_cwq *gcwq = get_gcwq(cpu);
spin_lock_init(&gcwq->lock); spin_lock_init(&gcwq->lock);
INIT_LIST_HEAD(&gcwq->worklist);
gcwq->cpu = cpu; gcwq->cpu = cpu;
INIT_LIST_HEAD(&gcwq->idle_list); INIT_LIST_HEAD(&gcwq->idle_list);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment