Commit 4ce62e9e authored by Tejun Heo's avatar Tejun Heo

workqueue: introduce NR_WORKER_POOLS and for_each_worker_pool()

Introduce NR_WORKER_POOLS and for_each_worker_pool() and convert code
paths which need to manipulate all pools in a gcwq to use them.
NR_WORKER_POOLS is currently one and for_each_worker_pool() iterates
over only @gcwq->pool.

Note that nr_running is per-pool property and converted to an array
with NR_WORKER_POOLS elements and renamed to pool_nr_running.  Note
that get_pool_nr_running() currently assumes 0 index.  The next patch
will make use of non-zero index.

The changes in this patch are mechanical and don't caues any
functional difference.  This is to prepare for multiple pools per
gcwq.

v2: nr_running indexing bug in get_pool_nr_running() fixed.

v3: Pointer to array is stupid.  Don't use it in get_pool_nr_running()
    as suggested by Linus.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
Cc: Tony Luck <tony.luck@intel.com>
Cc: Fengguang Wu <fengguang.wu@intel.com>
Cc: Linus Torvalds <torvalds@linux-foundation.org>
parent 11ebea50
...@@ -74,6 +74,8 @@ enum { ...@@ -74,6 +74,8 @@ enum {
TRUSTEE_RELEASE = 3, /* release workers */ TRUSTEE_RELEASE = 3, /* release workers */
TRUSTEE_DONE = 4, /* trustee is done */ TRUSTEE_DONE = 4, /* trustee is done */
NR_WORKER_POOLS = 1, /* # worker pools per gcwq */
BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */ BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER, BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,
BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1, BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1,
...@@ -274,6 +276,9 @@ EXPORT_SYMBOL_GPL(system_nrt_freezable_wq); ...@@ -274,6 +276,9 @@ EXPORT_SYMBOL_GPL(system_nrt_freezable_wq);
#define CREATE_TRACE_POINTS #define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h> #include <trace/events/workqueue.h>
#define for_each_worker_pool(pool, gcwq) \
for ((pool) = &(gcwq)->pool; (pool); (pool) = NULL)
#define for_each_busy_worker(worker, i, pos, gcwq) \ #define for_each_busy_worker(worker, i, pos, gcwq) \
for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \ for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \
hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry) hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
...@@ -454,7 +459,7 @@ static bool workqueue_freezing; /* W: have wqs started freezing? */ ...@@ -454,7 +459,7 @@ static bool workqueue_freezing; /* W: have wqs started freezing? */
* try_to_wake_up(). Put it in a separate cacheline. * try_to_wake_up(). Put it in a separate cacheline.
*/ */
static DEFINE_PER_CPU(struct global_cwq, global_cwq); static DEFINE_PER_CPU(struct global_cwq, global_cwq);
static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running); static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, pool_nr_running[NR_WORKER_POOLS]);
/* /*
* Global cpu workqueue and nr_running counter for unbound gcwq. The * Global cpu workqueue and nr_running counter for unbound gcwq. The
...@@ -462,7 +467,9 @@ static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running); ...@@ -462,7 +467,9 @@ static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
* workers have WORKER_UNBOUND set. * workers have WORKER_UNBOUND set.
*/ */
static struct global_cwq unbound_global_cwq; static struct global_cwq unbound_global_cwq;
static atomic_t unbound_gcwq_nr_running = ATOMIC_INIT(0); /* always 0 */ static atomic_t unbound_pool_nr_running[NR_WORKER_POOLS] = {
[0 ... NR_WORKER_POOLS - 1] = ATOMIC_INIT(0), /* always 0 */
};
static int worker_thread(void *__worker); static int worker_thread(void *__worker);
...@@ -477,11 +484,12 @@ static struct global_cwq *get_gcwq(unsigned int cpu) ...@@ -477,11 +484,12 @@ static struct global_cwq *get_gcwq(unsigned int cpu)
static atomic_t *get_pool_nr_running(struct worker_pool *pool) static atomic_t *get_pool_nr_running(struct worker_pool *pool)
{ {
int cpu = pool->gcwq->cpu; int cpu = pool->gcwq->cpu;
int idx = 0;
if (cpu != WORK_CPU_UNBOUND) if (cpu != WORK_CPU_UNBOUND)
return &per_cpu(gcwq_nr_running, cpu); return &per_cpu(pool_nr_running, cpu)[idx];
else else
return &unbound_gcwq_nr_running; return &unbound_pool_nr_running[idx];
} }
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
...@@ -3345,9 +3353,30 @@ EXPORT_SYMBOL_GPL(work_busy); ...@@ -3345,9 +3353,30 @@ EXPORT_SYMBOL_GPL(work_busy);
__ret1 < 0 ? -1 : 0; \ __ret1 < 0 ? -1 : 0; \
}) })
static bool gcwq_is_managing_workers(struct global_cwq *gcwq)
{
struct worker_pool *pool;
for_each_worker_pool(pool, gcwq)
if (pool->flags & POOL_MANAGING_WORKERS)
return true;
return false;
}
static bool gcwq_has_idle_workers(struct global_cwq *gcwq)
{
struct worker_pool *pool;
for_each_worker_pool(pool, gcwq)
if (!list_empty(&pool->idle_list))
return true;
return false;
}
static int __cpuinit trustee_thread(void *__gcwq) static int __cpuinit trustee_thread(void *__gcwq)
{ {
struct global_cwq *gcwq = __gcwq; struct global_cwq *gcwq = __gcwq;
struct worker_pool *pool;
struct worker *worker; struct worker *worker;
struct work_struct *work; struct work_struct *work;
struct hlist_node *pos; struct hlist_node *pos;
...@@ -3363,13 +3392,15 @@ static int __cpuinit trustee_thread(void *__gcwq) ...@@ -3363,13 +3392,15 @@ static int __cpuinit trustee_thread(void *__gcwq)
* cancelled. * cancelled.
*/ */
BUG_ON(gcwq->cpu != smp_processor_id()); BUG_ON(gcwq->cpu != smp_processor_id());
rc = trustee_wait_event(!(gcwq->pool.flags & POOL_MANAGING_WORKERS)); rc = trustee_wait_event(!gcwq_is_managing_workers(gcwq));
BUG_ON(rc < 0); BUG_ON(rc < 0);
gcwq->pool.flags |= POOL_MANAGING_WORKERS; for_each_worker_pool(pool, gcwq) {
pool->flags |= POOL_MANAGING_WORKERS;
list_for_each_entry(worker, &gcwq->pool.idle_list, entry) list_for_each_entry(worker, &pool->idle_list, entry)
worker->flags |= WORKER_ROGUE; worker->flags |= WORKER_ROGUE;
}
for_each_busy_worker(worker, i, pos, gcwq) for_each_busy_worker(worker, i, pos, gcwq)
worker->flags |= WORKER_ROGUE; worker->flags |= WORKER_ROGUE;
...@@ -3390,10 +3421,12 @@ static int __cpuinit trustee_thread(void *__gcwq) ...@@ -3390,10 +3421,12 @@ static int __cpuinit trustee_thread(void *__gcwq)
* keep_working() are always true as long as the worklist is * keep_working() are always true as long as the worklist is
* not empty. * not empty.
*/ */
atomic_set(get_pool_nr_running(&gcwq->pool), 0); for_each_worker_pool(pool, gcwq)
atomic_set(get_pool_nr_running(pool), 0);
spin_unlock_irq(&gcwq->lock); spin_unlock_irq(&gcwq->lock);
del_timer_sync(&gcwq->pool.idle_timer); for_each_worker_pool(pool, gcwq)
del_timer_sync(&pool->idle_timer);
spin_lock_irq(&gcwq->lock); spin_lock_irq(&gcwq->lock);
/* /*
...@@ -3415,29 +3448,38 @@ static int __cpuinit trustee_thread(void *__gcwq) ...@@ -3415,29 +3448,38 @@ static int __cpuinit trustee_thread(void *__gcwq)
* may be frozen works in freezable cwqs. Don't declare * may be frozen works in freezable cwqs. Don't declare
* completion while frozen. * completion while frozen.
*/ */
while (gcwq->pool.nr_workers != gcwq->pool.nr_idle || while (true) {
gcwq->flags & GCWQ_FREEZING || bool busy = false;
gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
int nr_works = 0;
list_for_each_entry(work, &gcwq->pool.worklist, entry) { for_each_worker_pool(pool, gcwq)
send_mayday(work); busy |= pool->nr_workers != pool->nr_idle;
nr_works++;
}
list_for_each_entry(worker, &gcwq->pool.idle_list, entry) { if (!busy && !(gcwq->flags & GCWQ_FREEZING) &&
if (!nr_works--) gcwq->trustee_state != TRUSTEE_IN_CHARGE)
break; break;
wake_up_process(worker->task);
}
if (need_to_create_worker(&gcwq->pool)) { for_each_worker_pool(pool, gcwq) {
spin_unlock_irq(&gcwq->lock); int nr_works = 0;
worker = create_worker(&gcwq->pool, false);
spin_lock_irq(&gcwq->lock); list_for_each_entry(work, &pool->worklist, entry) {
if (worker) { send_mayday(work);
worker->flags |= WORKER_ROGUE; nr_works++;
start_worker(worker); }
list_for_each_entry(worker, &pool->idle_list, entry) {
if (!nr_works--)
break;
wake_up_process(worker->task);
}
if (need_to_create_worker(pool)) {
spin_unlock_irq(&gcwq->lock);
worker = create_worker(pool, false);
spin_lock_irq(&gcwq->lock);
if (worker) {
worker->flags |= WORKER_ROGUE;
start_worker(worker);
}
} }
} }
...@@ -3452,11 +3494,18 @@ static int __cpuinit trustee_thread(void *__gcwq) ...@@ -3452,11 +3494,18 @@ static int __cpuinit trustee_thread(void *__gcwq)
* all workers till we're canceled. * all workers till we're canceled.
*/ */
do { do {
rc = trustee_wait_event(!list_empty(&gcwq->pool.idle_list)); rc = trustee_wait_event(gcwq_has_idle_workers(gcwq));
while (!list_empty(&gcwq->pool.idle_list))
destroy_worker(list_first_entry(&gcwq->pool.idle_list, i = 0;
struct worker, entry)); for_each_worker_pool(pool, gcwq) {
} while (gcwq->pool.nr_workers && rc >= 0); while (!list_empty(&pool->idle_list)) {
worker = list_first_entry(&pool->idle_list,
struct worker, entry);
destroy_worker(worker);
}
i |= pool->nr_workers;
}
} while (i && rc >= 0);
/* /*
* At this point, either draining has completed and no worker * At this point, either draining has completed and no worker
...@@ -3465,7 +3514,8 @@ static int __cpuinit trustee_thread(void *__gcwq) ...@@ -3465,7 +3514,8 @@ static int __cpuinit trustee_thread(void *__gcwq)
* Tell the remaining busy ones to rebind once it finishes the * Tell the remaining busy ones to rebind once it finishes the
* currently scheduled works by scheduling the rebind_work. * currently scheduled works by scheduling the rebind_work.
*/ */
WARN_ON(!list_empty(&gcwq->pool.idle_list)); for_each_worker_pool(pool, gcwq)
WARN_ON(!list_empty(&pool->idle_list));
for_each_busy_worker(worker, i, pos, gcwq) { for_each_busy_worker(worker, i, pos, gcwq) {
struct work_struct *rebind_work = &worker->rebind_work; struct work_struct *rebind_work = &worker->rebind_work;
...@@ -3490,7 +3540,8 @@ static int __cpuinit trustee_thread(void *__gcwq) ...@@ -3490,7 +3540,8 @@ static int __cpuinit trustee_thread(void *__gcwq)
} }
/* relinquish manager role */ /* relinquish manager role */
gcwq->pool.flags &= ~POOL_MANAGING_WORKERS; for_each_worker_pool(pool, gcwq)
pool->flags &= ~POOL_MANAGING_WORKERS;
/* notify completion */ /* notify completion */
gcwq->trustee = NULL; gcwq->trustee = NULL;
...@@ -3532,8 +3583,10 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, ...@@ -3532,8 +3583,10 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
unsigned int cpu = (unsigned long)hcpu; unsigned int cpu = (unsigned long)hcpu;
struct global_cwq *gcwq = get_gcwq(cpu); struct global_cwq *gcwq = get_gcwq(cpu);
struct task_struct *new_trustee = NULL; struct task_struct *new_trustee = NULL;
struct worker *uninitialized_var(new_worker); struct worker *new_workers[NR_WORKER_POOLS] = { };
struct worker_pool *pool;
unsigned long flags; unsigned long flags;
int i;
action &= ~CPU_TASKS_FROZEN; action &= ~CPU_TASKS_FROZEN;
...@@ -3546,12 +3599,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, ...@@ -3546,12 +3599,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
kthread_bind(new_trustee, cpu); kthread_bind(new_trustee, cpu);
/* fall through */ /* fall through */
case CPU_UP_PREPARE: case CPU_UP_PREPARE:
BUG_ON(gcwq->pool.first_idle); i = 0;
new_worker = create_worker(&gcwq->pool, false); for_each_worker_pool(pool, gcwq) {
if (!new_worker) { BUG_ON(pool->first_idle);
if (new_trustee) new_workers[i] = create_worker(pool, false);
kthread_stop(new_trustee); if (!new_workers[i++])
return NOTIFY_BAD; goto err_destroy;
} }
} }
...@@ -3568,8 +3621,11 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, ...@@ -3568,8 +3621,11 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE); wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
/* fall through */ /* fall through */
case CPU_UP_PREPARE: case CPU_UP_PREPARE:
BUG_ON(gcwq->pool.first_idle); i = 0;
gcwq->pool.first_idle = new_worker; for_each_worker_pool(pool, gcwq) {
BUG_ON(pool->first_idle);
pool->first_idle = new_workers[i++];
}
break; break;
case CPU_DYING: case CPU_DYING:
...@@ -3586,8 +3642,10 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, ...@@ -3586,8 +3642,10 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
gcwq->trustee_state = TRUSTEE_BUTCHER; gcwq->trustee_state = TRUSTEE_BUTCHER;
/* fall through */ /* fall through */
case CPU_UP_CANCELED: case CPU_UP_CANCELED:
destroy_worker(gcwq->pool.first_idle); for_each_worker_pool(pool, gcwq) {
gcwq->pool.first_idle = NULL; destroy_worker(pool->first_idle);
pool->first_idle = NULL;
}
break; break;
case CPU_DOWN_FAILED: case CPU_DOWN_FAILED:
...@@ -3604,18 +3662,32 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, ...@@ -3604,18 +3662,32 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
* Put the first_idle in and request a real manager to * Put the first_idle in and request a real manager to
* take a look. * take a look.
*/ */
spin_unlock_irq(&gcwq->lock); for_each_worker_pool(pool, gcwq) {
kthread_bind(gcwq->pool.first_idle->task, cpu); spin_unlock_irq(&gcwq->lock);
spin_lock_irq(&gcwq->lock); kthread_bind(pool->first_idle->task, cpu);
gcwq->pool.flags |= POOL_MANAGE_WORKERS; spin_lock_irq(&gcwq->lock);
start_worker(gcwq->pool.first_idle); pool->flags |= POOL_MANAGE_WORKERS;
gcwq->pool.first_idle = NULL; start_worker(pool->first_idle);
pool->first_idle = NULL;
}
break; break;
} }
spin_unlock_irqrestore(&gcwq->lock, flags); spin_unlock_irqrestore(&gcwq->lock, flags);
return notifier_from_errno(0); return notifier_from_errno(0);
err_destroy:
if (new_trustee)
kthread_stop(new_trustee);
spin_lock_irqsave(&gcwq->lock, flags);
for (i = 0; i < NR_WORKER_POOLS; i++)
if (new_workers[i])
destroy_worker(new_workers[i]);
spin_unlock_irqrestore(&gcwq->lock, flags);
return NOTIFY_BAD;
} }
#ifdef CONFIG_SMP #ifdef CONFIG_SMP
...@@ -3774,6 +3846,7 @@ void thaw_workqueues(void) ...@@ -3774,6 +3846,7 @@ void thaw_workqueues(void)
for_each_gcwq_cpu(cpu) { for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu); struct global_cwq *gcwq = get_gcwq(cpu);
struct worker_pool *pool;
struct workqueue_struct *wq; struct workqueue_struct *wq;
spin_lock_irq(&gcwq->lock); spin_lock_irq(&gcwq->lock);
...@@ -3795,7 +3868,8 @@ void thaw_workqueues(void) ...@@ -3795,7 +3868,8 @@ void thaw_workqueues(void)
cwq_activate_first_delayed(cwq); cwq_activate_first_delayed(cwq);
} }
wake_up_worker(&gcwq->pool); for_each_worker_pool(pool, gcwq)
wake_up_worker(pool);
spin_unlock_irq(&gcwq->lock); spin_unlock_irq(&gcwq->lock);
} }
...@@ -3816,25 +3890,29 @@ static int __init init_workqueues(void) ...@@ -3816,25 +3890,29 @@ static int __init init_workqueues(void)
/* initialize gcwqs */ /* initialize gcwqs */
for_each_gcwq_cpu(cpu) { for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu); struct global_cwq *gcwq = get_gcwq(cpu);
struct worker_pool *pool;
spin_lock_init(&gcwq->lock); spin_lock_init(&gcwq->lock);
gcwq->pool.gcwq = gcwq;
INIT_LIST_HEAD(&gcwq->pool.worklist);
gcwq->cpu = cpu; gcwq->cpu = cpu;
gcwq->flags |= GCWQ_DISASSOCIATED; gcwq->flags |= GCWQ_DISASSOCIATED;
INIT_LIST_HEAD(&gcwq->pool.idle_list);
for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
INIT_HLIST_HEAD(&gcwq->busy_hash[i]); INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
init_timer_deferrable(&gcwq->pool.idle_timer); for_each_worker_pool(pool, gcwq) {
gcwq->pool.idle_timer.function = idle_worker_timeout; pool->gcwq = gcwq;
gcwq->pool.idle_timer.data = (unsigned long)&gcwq->pool; INIT_LIST_HEAD(&pool->worklist);
INIT_LIST_HEAD(&pool->idle_list);
setup_timer(&gcwq->pool.mayday_timer, gcwq_mayday_timeout, init_timer_deferrable(&pool->idle_timer);
(unsigned long)&gcwq->pool); pool->idle_timer.function = idle_worker_timeout;
pool->idle_timer.data = (unsigned long)pool;
ida_init(&gcwq->pool.worker_ida); setup_timer(&pool->mayday_timer, gcwq_mayday_timeout,
(unsigned long)pool);
ida_init(&pool->worker_ida);
}
gcwq->trustee_state = TRUSTEE_DONE; gcwq->trustee_state = TRUSTEE_DONE;
init_waitqueue_head(&gcwq->trustee_wait); init_waitqueue_head(&gcwq->trustee_wait);
...@@ -3843,15 +3921,20 @@ static int __init init_workqueues(void) ...@@ -3843,15 +3921,20 @@ static int __init init_workqueues(void)
/* create the initial worker */ /* create the initial worker */
for_each_online_gcwq_cpu(cpu) { for_each_online_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu); struct global_cwq *gcwq = get_gcwq(cpu);
struct worker *worker; struct worker_pool *pool;
if (cpu != WORK_CPU_UNBOUND) if (cpu != WORK_CPU_UNBOUND)
gcwq->flags &= ~GCWQ_DISASSOCIATED; gcwq->flags &= ~GCWQ_DISASSOCIATED;
worker = create_worker(&gcwq->pool, true);
BUG_ON(!worker); for_each_worker_pool(pool, gcwq) {
spin_lock_irq(&gcwq->lock); struct worker *worker;
start_worker(worker);
spin_unlock_irq(&gcwq->lock); worker = create_worker(pool, true);
BUG_ON(!worker);
spin_lock_irq(&gcwq->lock);
start_worker(worker);
spin_unlock_irq(&gcwq->lock);
}
} }
system_wq = alloc_workqueue("events", 0, 0); system_wq = alloc_workqueue("events", 0, 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment