Commit 6bcaa29d authored by Rusty Russell's avatar Rusty Russell Committed by Linus Torvalds

[PATCH] Hotplug CPUs: Workqueue Changes

Workqueues need to bring up/destroy the per-cpu thread on cpu up/down.

1) Add a global list of workqueues, and keep the name in the structure
   (to name the newly created thread).

2) Remove BUG_ON in run_workqueue, since thread is dragged off CPU when
   it goes down.

3) Lock out cpu up/down in flush_workqueue, create_workqueue and
   destroy_workqueue.

4) Add notifier to add/destroy workqueue threads, and take over work.
parent 010b27dc
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
#include <linux/completion.h> #include <linux/completion.h>
#include <linux/workqueue.h> #include <linux/workqueue.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/cpu.h>
#include <linux/notifier.h>
#include <linux/kthread.h> #include <linux/kthread.h>
/* /*
...@@ -56,8 +58,22 @@ struct cpu_workqueue_struct { ...@@ -56,8 +58,22 @@ struct cpu_workqueue_struct {
*/ */
struct workqueue_struct { struct workqueue_struct {
struct cpu_workqueue_struct cpu_wq[NR_CPUS]; struct cpu_workqueue_struct cpu_wq[NR_CPUS];
const char *name;
struct list_head list;
}; };
#ifdef CONFIG_HOTPLUG_CPU
/* All the workqueues on the system, for hotplug cpu to add/remove
threads to each one as cpus come/go. Protected by cpucontrol
sem. */
static LIST_HEAD(workqueues);
#define add_workqueue(wq) list_add(&(wq)->list, &workqueues)
#define del_workqueue(wq) list_del(&(wq)->list)
#else
#define add_workqueue(wq)
#define del_workqueue(wq)
#endif /* CONFIG_HOTPLUG_CPU */
/* Preempt must be disabled. */ /* Preempt must be disabled. */
static void __queue_work(struct cpu_workqueue_struct *cwq, static void __queue_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work) struct work_struct *work)
...@@ -161,7 +177,6 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -161,7 +177,6 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
static int worker_thread(void *__cwq) static int worker_thread(void *__cwq)
{ {
struct cpu_workqueue_struct *cwq = __cwq; struct cpu_workqueue_struct *cwq = __cwq;
int cpu = cwq - cwq->wq->cpu_wq;
DECLARE_WAITQUEUE(wait, current); DECLARE_WAITQUEUE(wait, current);
struct k_sigaction sa; struct k_sigaction sa;
sigset_t blocked; sigset_t blocked;
...@@ -169,7 +184,6 @@ static int worker_thread(void *__cwq) ...@@ -169,7 +184,6 @@ static int worker_thread(void *__cwq)
current->flags |= PF_IOTHREAD; current->flags |= PF_IOTHREAD;
set_user_nice(current, -10); set_user_nice(current, -10);
BUG_ON(smp_processor_id() != cpu);
/* Block and flush all signals */ /* Block and flush all signals */
sigfillset(&blocked); sigfillset(&blocked);
...@@ -219,6 +233,7 @@ void fastcall flush_workqueue(struct workqueue_struct *wq) ...@@ -219,6 +233,7 @@ void fastcall flush_workqueue(struct workqueue_struct *wq)
might_sleep(); might_sleep();
lock_cpu_hotplug();
for (cpu = 0; cpu < NR_CPUS; cpu++) { for (cpu = 0; cpu < NR_CPUS; cpu++) {
DEFINE_WAIT(wait); DEFINE_WAIT(wait);
long sequence_needed; long sequence_needed;
...@@ -248,11 +263,10 @@ void fastcall flush_workqueue(struct workqueue_struct *wq) ...@@ -248,11 +263,10 @@ void fastcall flush_workqueue(struct workqueue_struct *wq)
finish_wait(&cwq->work_done, &wait); finish_wait(&cwq->work_done, &wait);
spin_unlock_irq(&cwq->lock); spin_unlock_irq(&cwq->lock);
} }
unlock_cpu_hotplug();
} }
static int create_workqueue_thread(struct workqueue_struct *wq, static int create_workqueue_thread(struct workqueue_struct *wq, int cpu)
const char *name,
int cpu)
{ {
struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu; struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
struct task_struct *p; struct task_struct *p;
...@@ -266,7 +280,7 @@ static int create_workqueue_thread(struct workqueue_struct *wq, ...@@ -266,7 +280,7 @@ static int create_workqueue_thread(struct workqueue_struct *wq,
init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done); init_waitqueue_head(&cwq->work_done);
p = kthread_create(worker_thread, cwq, "%s/%d", name, cpu); p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
if (IS_ERR(p)) if (IS_ERR(p))
return PTR_ERR(p); return PTR_ERR(p);
cwq->thread = p; cwq->thread = p;
...@@ -286,14 +300,19 @@ struct workqueue_struct *create_workqueue(const char *name) ...@@ -286,14 +300,19 @@ struct workqueue_struct *create_workqueue(const char *name)
return NULL; return NULL;
memset(wq, 0, sizeof(*wq)); memset(wq, 0, sizeof(*wq));
wq->name = name;
/* We don't need the distraction of CPUs appearing and vanishing. */
lock_cpu_hotplug();
for (cpu = 0; cpu < NR_CPUS; cpu++) { for (cpu = 0; cpu < NR_CPUS; cpu++) {
if (!cpu_online(cpu)) if (!cpu_online(cpu))
continue; continue;
if (create_workqueue_thread(wq, name, cpu) < 0) if (create_workqueue_thread(wq, cpu) < 0)
destroy = 1; destroy = 1;
else else
wake_up_process(wq->cpu_wq[cpu].thread); wake_up_process(wq->cpu_wq[cpu].thread);
} }
add_workqueue(wq);
/* /*
* Was there any error during startup? If yes then clean up: * Was there any error during startup? If yes then clean up:
*/ */
...@@ -301,16 +320,23 @@ struct workqueue_struct *create_workqueue(const char *name) ...@@ -301,16 +320,23 @@ struct workqueue_struct *create_workqueue(const char *name)
destroy_workqueue(wq); destroy_workqueue(wq);
wq = NULL; wq = NULL;
} }
unlock_cpu_hotplug();
return wq; return wq;
} }
static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
{ {
struct cpu_workqueue_struct *cwq; struct cpu_workqueue_struct *cwq;
unsigned long flags;
struct task_struct *p;
cwq = wq->cpu_wq + cpu; cwq = wq->cpu_wq + cpu;
if (cwq->thread) spin_lock_irqsave(&cwq->lock, flags);
kthread_stop(cwq->thread); p = cwq->thread;
cwq->thread = NULL;
spin_unlock_irqrestore(&cwq->lock, flags);
if (p)
kthread_stop(p);
} }
void destroy_workqueue(struct workqueue_struct *wq) void destroy_workqueue(struct workqueue_struct *wq)
...@@ -319,10 +345,14 @@ void destroy_workqueue(struct workqueue_struct *wq) ...@@ -319,10 +345,14 @@ void destroy_workqueue(struct workqueue_struct *wq)
flush_workqueue(wq); flush_workqueue(wq);
/* We don't need the distraction of CPUs appearing and vanishing. */
lock_cpu_hotplug();
for (cpu = 0; cpu < NR_CPUS; cpu++) { for (cpu = 0; cpu < NR_CPUS; cpu++) {
if (cpu_online(cpu)) if (cpu_online(cpu))
cleanup_workqueue_thread(wq, cpu); cleanup_workqueue_thread(wq, cpu);
} }
del_workqueue(wq);
unlock_cpu_hotplug();
kfree(wq); kfree(wq);
} }
...@@ -364,8 +394,75 @@ int current_is_keventd(void) ...@@ -364,8 +394,75 @@ int current_is_keventd(void)
} }
#ifdef CONFIG_HOTPLUG_CPU
/* Take the work from this (downed) CPU. */
static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
{
struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
LIST_HEAD(list);
struct work_struct *work;
spin_lock_irq(&cwq->lock);
list_splice_init(&cwq->worklist, &list);
while (!list_empty(&list)) {
printk("Taking work for %s\n", wq->name);
work = list_entry(list.next,struct work_struct,entry);
list_del(&work->entry);
__queue_work(wq->cpu_wq + smp_processor_id(), work);
}
spin_unlock_irq(&cwq->lock);
}
/* We're holding the cpucontrol mutex here */
static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
unsigned int hotcpu = (unsigned long)hcpu;
struct workqueue_struct *wq;
switch (action) {
case CPU_UP_PREPARE:
/* Create a new workqueue thread for it. */
list_for_each_entry(wq, &workqueues, list) {
if (create_workqueue_thread(wq, hotcpu) < 0) {
printk("workqueue for %i failed\n", hotcpu);
return NOTIFY_BAD;
}
}
break;
case CPU_ONLINE:
/* Kick off worker threads. */
list_for_each_entry(wq, &workqueues, list)
wake_up_process(wq->cpu_wq[hotcpu].thread);
break;
case CPU_UP_CANCELED:
list_for_each_entry(wq, &workqueues, list) {
/* Unbind so it can run. */
kthread_bind(wq->cpu_wq[hotcpu].thread,
smp_processor_id());
cleanup_workqueue_thread(wq, hotcpu);
}
break;
case CPU_DEAD:
list_for_each_entry(wq, &workqueues, list)
cleanup_workqueue_thread(wq, hotcpu);
list_for_each_entry(wq, &workqueues, list)
take_over_work(wq, hotcpu);
break;
}
return NOTIFY_OK;
}
#endif
void init_workqueues(void) void init_workqueues(void)
{ {
hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events"); keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq); BUG_ON(!keventd_wq);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment