Commit 933ba102 authored by Andrew Morton's avatar Andrew Morton Committed by Linus Torvalds

[PATCH] kthread primitive

From: Rusty Russell <rusty@rustcorp.com.au>

These two patches provide the framework for stopping kernel threads to
allow hotplug CPU.  This one just adds kthread.c and kthread.h, next
one uses it.

Most importantly, adds a Monty Python quote to the kernel.

Details:

The hotplug CPU code introduces two major problems:

1) Threads which previously never stopped (migration thread,
   ksoftirqd, keventd) have to be stopped cleanly as CPUs go offline.
2) Threads which previously never had to be created now have
   to be created when a CPU goes online.

Unfortunately, stopping a thread is fairly baroque, involving memory
barriers, a completion and spinning until the task is actually dead
(for example, complete_and_exit() must be used if inside a module).

There are also three problems in starting a thread:
1) Doing it from a random process context risks environment contamination:
   better to do it from keventd to guarantee a clean environment, a-la
   call_usermodehelper.
2) Getting the task struct without races is a hard: see kernel/sched.c
   migration_call(), kernel/workqueue.c create_workqueue_thread().
3) There are races in starting a thread for a CPU which is not yet
   online: migration thread does a complex dance at the moment for
   a similar reason (there may be no migration thread to migrate us).

Place all this logic in some primitives to make life easier:
kthread_create() and kthread_stop().  These primitives require no
extra data-structures in the caller: they operate on normal "struct
task_struct"s.

Other changes:

- Expose keventd_up(), as keventd and migration threads will use kthread to
  launch, and kthread normally uses workqueues and must recognize this case.

- Kthreads created at boot before "keventd" are spawned directly.  However,
  this means that they don't have all signals blocked, and hence can be
  killed.  The simplest solution is to always explicitly block all signals in
  the kthread.

- Change over the migration threads, the workqueue threads and the
  ksoftirqd threads to use kthread.

- module.c currently spawns threads directly to stop the machine, so a
  module can be atomically tested for removal.

- Unfortunately, this means that the current task is manipulated (which
  races with set_cpus_allowed, for example), and it can't set its priority
  artificially high.  Using a kernel thread can solve this cleanly, and with
  kthread_run, it's simple.

- kthreads use keventd, so they inherit its cpus_allowed mask.  Unset it.
  All current users set it explicity anyway, but it's nice to fix.

- call_usermode_helper uses keventd, so the process created inherits its
  cpus_allowed mask.  Unset it.

- Prevent errors in boot when cpus_possible() contains a cpu which is not
  online (ie.  a cpu didn't come up).  This doesn't happen on x86, since a
  boot failure makes that CPU no longer possible (hacky, but it works).

- When the cpu fails to come up, some callbacks do kthread_stop(), which
  doesn't work without keventd (which hasn't started yet).  Call it directly,
  and take care that it restores signal state (note: do_sigaction does a
  flush on blocked signals, so we don't need to repeat it).
parent ad77865c
#ifndef _LINUX_KTHREAD_H
#define _LINUX_KTHREAD_H
/* Simple interface for creating and stopping kernel threads without mess. */
#include <linux/err.h>
#include <linux/sched.h>
/**
* kthread_create: create a kthread.
* @threadfn: the function to run until signal_pending(current).
* @data: data ptr for @threadfn.
* @namefmt: printf-style name for the thread.
*
* Description: This helper function creates and names a kernel
* thread. The thread will be stopped: use wake_up_process() to start
* it. See also kthread_run(), kthread_create_on_cpu().
*
* When woken, the thread will run @threadfn() with @data as its
* argument. @threadfn can either call do_exit() directly if it is a
* standalone thread for which noone will call kthread_stop(), or
* return when 'kthread_should_stop()' is true (which means
* kthread_stop() has been called). The return value should be zero
* or a negative error number: it will be passed to kthread_stop().
*
* Returns a task_struct or ERR_PTR(-ENOMEM).
*/
struct task_struct *kthread_create(int (*threadfn)(void *data),
void *data,
const char namefmt[], ...);
/**
* kthread_run: create and wake a thread.
* @threadfn: the function to run until signal_pending(current).
* @data: data ptr for @threadfn.
* @namefmt: printf-style name for the thread.
*
* Description: Convenient wrapper for kthread_create() followed by
* wake_up_process(). Returns the kthread, or ERR_PTR(-ENOMEM). */
#define kthread_run(threadfn, data, namefmt, ...) \
({ \
struct task_struct *__k \
= kthread_create(threadfn, data, namefmt, ## __VA_ARGS__); \
if (!IS_ERR(__k)) \
wake_up_process(__k); \
__k; \
})
/**
* kthread_bind: bind a just-created kthread to a cpu.
* @k: thread created by kthread_create().
* @cpu: cpu (might not be online, must be possible) for @k to run on.
*
* Description: This function is equivalent to set_cpus_allowed(),
* except that @cpu doesn't need to be online, and the thread must be
* stopped (ie. just returned from kthread_create().
*/
void kthread_bind(struct task_struct *k, unsigned int cpu);
/**
* kthread_stop: stop a thread created by kthread_create().
* @k: thread created by kthread_create().
*
* Sets kthread_should_stop() for @k to return true, wakes it, and
* waits for it to exit. Your threadfn() must not call do_exit()
* itself if you use this function! This can also be called after
* kthread_create() instead of calling wake_up_process(): the thread
* will exit without calling threadfn().
*
* Returns the result of threadfn(), or -EINTR if wake_up_process()
* was never called. */
int kthread_stop(struct task_struct *k);
/**
* kthread_should_stop: should this kthread return now?
*
* When someone calls kthread_stop on your kthread, it will be woken
* and this will return true. You should then return, and your return
* value will be passed through to kthread_stop().
*/
int kthread_should_stop(void);
#endif /* _LINUX_KTHREAD_H */
...@@ -708,6 +708,8 @@ extern task_t *child_reaper; ...@@ -708,6 +708,8 @@ extern task_t *child_reaper;
extern int do_execve(char *, char __user * __user *, char __user * __user *, struct pt_regs *); extern int do_execve(char *, char __user * __user *, char __user * __user *, struct pt_regs *);
extern long do_fork(unsigned long, unsigned long, struct pt_regs *, unsigned long, int __user *, int __user *); extern long do_fork(unsigned long, unsigned long, struct pt_regs *, unsigned long, int __user *, int __user *);
extern struct task_struct * copy_process(unsigned long, unsigned long, struct pt_regs *, unsigned long, int __user *, int __user *); extern struct task_struct * copy_process(unsigned long, unsigned long, struct pt_regs *, unsigned long, int __user *, int __user *);
extern asmlinkage long sys_sched_setscheduler(pid_t pid, int policy,
struct sched_param __user *parm);
#ifdef CONFIG_SMP #ifdef CONFIG_SMP
extern void wait_task_inactive(task_t * p); extern void wait_task_inactive(task_t * p);
......
...@@ -60,6 +60,7 @@ extern int FASTCALL(schedule_work(struct work_struct *work)); ...@@ -60,6 +60,7 @@ extern int FASTCALL(schedule_work(struct work_struct *work));
extern int FASTCALL(schedule_delayed_work(struct work_struct *work, unsigned long delay)); extern int FASTCALL(schedule_delayed_work(struct work_struct *work, unsigned long delay));
extern void flush_scheduled_work(void); extern void flush_scheduled_work(void);
extern int current_is_keventd(void); extern int current_is_keventd(void);
extern int keventd_up(void);
extern void init_workqueues(void); extern void init_workqueues(void);
......
...@@ -6,7 +6,8 @@ obj-y = sched.o fork.o exec_domain.o panic.o printk.o profile.o \ ...@@ -6,7 +6,8 @@ obj-y = sched.o fork.o exec_domain.o panic.o printk.o profile.o \
exit.o itimer.o time.o softirq.o resource.o \ exit.o itimer.o time.o softirq.o resource.o \
sysctl.o capability.o ptrace.o timer.o user.o \ sysctl.o capability.o ptrace.o timer.o user.o \
signal.o sys.o kmod.o workqueue.o pid.o \ signal.o sys.o kmod.o workqueue.o pid.o \
rcupdate.o intermodule.o extable.o params.o posix-timers.o rcupdate.o intermodule.o extable.o params.o posix-timers.o \
kthread.o
obj-$(CONFIG_FUTEX) += futex.o obj-$(CONFIG_FUTEX) += futex.o
obj-$(CONFIG_GENERIC_ISA_DMA) += dma.o obj-$(CONFIG_GENERIC_ISA_DMA) += dma.o
......
...@@ -149,6 +149,7 @@ static int ____call_usermodehelper(void *data) ...@@ -149,6 +149,7 @@ static int ____call_usermodehelper(void *data)
{ {
struct subprocess_info *sub_info = data; struct subprocess_info *sub_info = data;
int retval; int retval;
cpumask_t mask = CPU_MASK_ALL;
/* Unblock all signals. */ /* Unblock all signals. */
flush_signals(current); flush_signals(current);
...@@ -158,6 +159,9 @@ static int ____call_usermodehelper(void *data) ...@@ -158,6 +159,9 @@ static int ____call_usermodehelper(void *data)
recalc_sigpending(); recalc_sigpending();
spin_unlock_irq(&current->sighand->siglock); spin_unlock_irq(&current->sighand->siglock);
/* We can run anywhere, unlike our parent keventd(). */
set_cpus_allowed(current, mask);
retval = -EPERM; retval = -EPERM;
if (current->fs->root) if (current->fs->root)
retval = execve(sub_info->path, sub_info->argv,sub_info->envp); retval = execve(sub_info->path, sub_info->argv,sub_info->envp);
......
/* Kernel thread helper functions.
* Copyright (C) 2004 IBM Corporation, Rusty Russell.
*
* Creation is done via keventd, so that we get a clean environment
* even if we're invoked from userspace (think modprobe, hotplug cpu,
* etc.).
*/
#include <linux/sched.h>
#include <linux/kthread.h>
#include <linux/completion.h>
#include <linux/err.h>
#include <linux/unistd.h>
#include <asm/semaphore.h>
struct kthread_create_info
{
/* Information passed to kthread() from keventd. */
int (*threadfn)(void *data);
void *data;
struct completion started;
/* Result passed back to kthread_create() from keventd. */
struct task_struct *result;
struct completion done;
};
struct kthread_stop_info
{
struct task_struct *k;
int err;
struct completion done;
};
/* Thread stopping is done by setthing this var: lock serializes
* multiple kthread_stop calls. */
static DECLARE_MUTEX(kthread_stop_lock);
static struct kthread_stop_info kthread_stop_info;
int kthread_should_stop(void)
{
return (kthread_stop_info.k == current);
}
static int kthread(void *_create)
{
struct kthread_create_info *create = _create;
int (*threadfn)(void *data);
void *data;
sigset_t blocked;
int ret = -EINTR;
cpumask_t mask = CPU_MASK_ALL;
/* Copy data: it's on keventd's stack */
threadfn = create->threadfn;
data = create->data;
/* Block and flush all signals (in case we're not from keventd). */
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
/* By default we can run anywhere, unlike keventd. */
set_cpus_allowed(current, mask);
/* OK, tell user we're spawned, wait for stop or wakeup */
__set_current_state(TASK_INTERRUPTIBLE);
complete(&create->started);
schedule();
if (!kthread_should_stop())
ret = threadfn(data);
/* It might have exited on its own, w/o kthread_stop. Check. */
if (kthread_should_stop()) {
kthread_stop_info.err = ret;
complete(&kthread_stop_info.done);
}
return 0;
}
/* We are keventd: create a thread. */
static void keventd_create_kthread(void *_create)
{
struct kthread_create_info *create = _create;
int pid;
/* We want our own signal handler (we take no signals by default). */
pid = kernel_thread(kthread, create, CLONE_FS | CLONE_FILES | SIGCHLD);
if (pid < 0) {
create->result = ERR_PTR(pid);
} else {
wait_for_completion(&create->started);
create->result = find_task_by_pid(pid);
wait_task_inactive(create->result);
}
complete(&create->done);
}
struct task_struct *kthread_create(int (*threadfn)(void *data),
void *data,
const char namefmt[],
...)
{
struct kthread_create_info create;
DECLARE_WORK(work, keventd_create_kthread, &create);
create.threadfn = threadfn;
create.data = data;
init_completion(&create.started);
init_completion(&create.done);
/* If we're being called to start the first workqueue, we
* can't use keventd. */
if (!keventd_up())
work.func(work.data);
else {
schedule_work(&work);
wait_for_completion(&create.done);
}
if (!IS_ERR(create.result)) {
va_list args;
va_start(args, namefmt);
vsnprintf(create.result->comm, sizeof(create.result->comm),
namefmt, args);
va_end(args);
}
return create.result;
}
void kthread_bind(struct task_struct *k, unsigned int cpu)
{
BUG_ON(k->state != TASK_INTERRUPTIBLE);
k->thread_info->cpu = cpu;
k->cpus_allowed = cpumask_of_cpu(cpu);
}
int kthread_stop(struct task_struct *k)
{
int ret;
down(&kthread_stop_lock);
/* It could exit after stop_info.k set, but before wake_up_process. */
get_task_struct(k);
/* Must init completion *before* thread sees kthread_stop_info.k */
init_completion(&kthread_stop_info.done);
wmb();
/* Now set kthread_should_stop() to true, and wake it up. */
kthread_stop_info.k = k;
wake_up_process(k);
put_task_struct(k);
/* Once it dies, reset stop ptr, gather result and we're done. */
wait_for_completion(&kthread_stop_info.done);
kthread_stop_info.k = NULL;
ret = kthread_stop_info.err;
up(&kthread_stop_lock);
return ret;
}
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <linux/err.h> #include <linux/err.h>
#include <linux/vermagic.h> #include <linux/vermagic.h>
#include <linux/notifier.h> #include <linux/notifier.h>
#include <linux/kthread.h>
#include <asm/uaccess.h> #include <asm/uaccess.h>
#include <asm/semaphore.h> #include <asm/semaphore.h>
#include <asm/pgalloc.h> #include <asm/pgalloc.h>
...@@ -457,6 +458,40 @@ static void module_unload_free(struct module *mod) ...@@ -457,6 +458,40 @@ static void module_unload_free(struct module *mod)
} }
} }
#ifdef CONFIG_MODULE_FORCE_UNLOAD
static inline int try_force(unsigned int flags)
{
int ret = (flags & O_TRUNC);
if (ret)
tainted |= TAINT_FORCED_MODULE;
return ret;
}
#else
static inline int try_force(unsigned int flags)
{
return 0;
}
#endif /* CONFIG_MODULE_FORCE_UNLOAD */
static int try_stop_module_local(struct module *mod, int flags, int *forced)
{
local_irq_disable();
/* If it's not unused, quit unless we are told to block. */
if ((flags & O_NONBLOCK) && module_refcount(mod) != 0) {
if (!(*forced = try_force(flags))) {
local_irq_enable();
return -EWOULDBLOCK;
}
}
/* Mark it as dying. */
mod->waiter = current;
mod->state = MODULE_STATE_GOING;
local_irq_enable();
return 0;
}
#ifdef CONFIG_SMP #ifdef CONFIG_SMP
/* Thread to stop each CPU in user context. */ /* Thread to stop each CPU in user context. */
enum stopref_state { enum stopref_state {
...@@ -475,13 +510,6 @@ static int stopref(void *cpu) ...@@ -475,13 +510,6 @@ static int stopref(void *cpu)
int irqs_disabled = 0; int irqs_disabled = 0;
int prepared = 0; int prepared = 0;
sprintf(current->comm, "kmodule%lu\n", (unsigned long)cpu);
/* Highest priority we can manage, and move to right CPU. */
#if 0 /* FIXME */
struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
setscheduler(current->pid, SCHED_FIFO, &param);
#endif
set_cpus_allowed(current, cpumask_of_cpu((int)(long)cpu)); set_cpus_allowed(current, cpumask_of_cpu((int)(long)cpu));
/* Ack: we are alive */ /* Ack: we are alive */
...@@ -535,29 +563,33 @@ static void stopref_set_state(enum stopref_state state, int sleep) ...@@ -535,29 +563,33 @@ static void stopref_set_state(enum stopref_state state, int sleep)
} }
} }
/* Stop the machine. Disables irqs. */ struct stopref
static int stop_refcounts(void) {
struct module *mod;
int flags;
int *forced;
struct completion started;
};
static int spawn_stopref(void *data)
{ {
unsigned int i, cpu; struct stopref *sref = data;
cpumask_t old_allowed; struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
unsigned int i, cpu = smp_processor_id();
int ret = 0; int ret = 0;
/* One thread per cpu. We'll do our own. */ complete(&sref->started);
cpu = smp_processor_id();
/* FIXME: racy with set_cpus_allowed. */ /* One high-prio thread per cpu. We'll do one (any one). */
old_allowed = current->cpus_allowed;
set_cpus_allowed(current, cpumask_of_cpu(cpu)); set_cpus_allowed(current, cpumask_of_cpu(cpu));
sys_sched_setscheduler(current->pid, SCHED_FIFO, &param);
atomic_set(&stopref_thread_ack, 0); atomic_set(&stopref_thread_ack, 0);
stopref_num_threads = 0; stopref_num_threads = 0;
stopref_state = STOPREF_WAIT; stopref_state = STOPREF_WAIT;
/* No CPUs can come up or down during this. */ for_each_online_cpu(i) {
lock_cpu_hotplug(); if (i == cpu)
for (i = 0; i < NR_CPUS; i++) {
if (i == cpu || !cpu_online(i))
continue; continue;
ret = kernel_thread(stopref, (void *)(long)i, CLONE_KERNEL); ret = kernel_thread(stopref, (void *)(long)i, CLONE_KERNEL);
if (ret < 0) if (ret < 0)
...@@ -572,40 +604,57 @@ static int stop_refcounts(void) ...@@ -572,40 +604,57 @@ static int stop_refcounts(void)
/* If some failed, kill them all. */ /* If some failed, kill them all. */
if (ret < 0) { if (ret < 0) {
stopref_set_state(STOPREF_EXIT, 1); stopref_set_state(STOPREF_EXIT, 1);
unlock_cpu_hotplug(); goto out;
return ret;
} }
/* Don't schedule us away at this point, please. */ /* Don't schedule us away at this point, please. */
preempt_disable(); preempt_disable();
/* Now they are all scheduled, make them hold the CPUs, ready. */ /* Now they are all started, make them hold the CPUs, ready. */
stopref_set_state(STOPREF_PREPARE, 0); stopref_set_state(STOPREF_PREPARE, 0);
/* Make them disable irqs. */ /* Make them disable irqs. */
stopref_set_state(STOPREF_DISABLE_IRQ, 0); stopref_set_state(STOPREF_DISABLE_IRQ, 0);
local_irq_disable(); /* Atomically disable module if possible */
return 0; ret = try_stop_module_local(sref->mod, sref->flags, sref->forced);
}
/* Restart the machine. Re-enables irqs. */
static void restart_refcounts(void)
{
stopref_set_state(STOPREF_EXIT, 0); stopref_set_state(STOPREF_EXIT, 0);
local_irq_enable();
preempt_enable(); preempt_enable();
unlock_cpu_hotplug();
out:
/* Wait for kthread_stop */
while (!kthread_should_stop()) {
__set_current_state(TASK_INTERRUPTIBLE);
schedule();
}
return ret;
} }
#else /* ...!SMP */
static inline int stop_refcounts(void) static int try_stop_module(struct module *mod, int flags, int *forced)
{ {
local_irq_disable(); struct task_struct *p;
return 0; struct stopref sref = { mod, flags, forced };
int ret;
init_completion(&sref.started);
/* No CPUs can come up or down during this. */
lock_cpu_hotplug();
p = kthread_run(spawn_stopref, &sref, "krmmod");
if (IS_ERR(p))
ret = PTR_ERR(p);
else {
wait_for_completion(&sref.started);
ret = kthread_stop(p);
}
unlock_cpu_hotplug();
return ret;
} }
static inline void restart_refcounts(void) #else /* ...!SMP */
static inline int try_stop_module(struct module *mod, int flags, int *forced)
{ {
local_irq_enable(); return try_stop_module_local(mod, flags, forced);
} }
#endif #endif
...@@ -622,21 +671,6 @@ EXPORT_SYMBOL(module_refcount); ...@@ -622,21 +671,6 @@ EXPORT_SYMBOL(module_refcount);
/* This exists whether we can unload or not */ /* This exists whether we can unload or not */
static void free_module(struct module *mod); static void free_module(struct module *mod);
#ifdef CONFIG_MODULE_FORCE_UNLOAD
static inline int try_force(unsigned int flags)
{
int ret = (flags & O_TRUNC);
if (ret)
tainted |= TAINT_FORCED_MODULE;
return ret;
}
#else
static inline int try_force(unsigned int flags)
{
return 0;
}
#endif /* CONFIG_MODULE_FORCE_UNLOAD */
/* Stub function for modules which don't have an exitfn */ /* Stub function for modules which don't have an exitfn */
void cleanup_module(void) void cleanup_module(void)
{ {
...@@ -706,26 +740,9 @@ sys_delete_module(const char __user *name_user, unsigned int flags) ...@@ -706,26 +740,9 @@ sys_delete_module(const char __user *name_user, unsigned int flags)
goto out; goto out;
} }
} }
/* Stop the machine so refcounts can't move: irqs disabled. */
DEBUGP("Stopping refcounts...\n");
ret = stop_refcounts();
if (ret != 0)
goto out;
/* If it's not unused, quit unless we are told to block. */
if ((flags & O_NONBLOCK) && module_refcount(mod) != 0) {
forced = try_force(flags);
if (!forced) {
ret = -EWOULDBLOCK;
restart_refcounts();
goto out;
}
}
/* Mark it as dying. */ /* Stop the machine so refcounts can't move and disable module. */
mod->waiter = current; ret = try_stop_module(mod, flags, &forced);
mod->state = MODULE_STATE_GOING;
restart_refcounts();
/* Never wait if forced. */ /* Never wait if forced. */
if (!forced && module_refcount(mod) != 0) if (!forced && module_refcount(mod) != 0)
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include <linux/rcupdate.h> #include <linux/rcupdate.h>
#include <linux/cpu.h> #include <linux/cpu.h>
#include <linux/percpu.h> #include <linux/percpu.h>
#include <linux/kthread.h>
#ifdef CONFIG_NUMA #ifdef CONFIG_NUMA
#define cpu_to_node_mask(cpu) node_to_cpumask(cpu_to_node(cpu)) #define cpu_to_node_mask(cpu) node_to_cpumask(cpu_to_node(cpu))
...@@ -2749,12 +2750,6 @@ static void move_task_away(struct task_struct *p, int dest_cpu) ...@@ -2749,12 +2750,6 @@ static void move_task_away(struct task_struct *p, int dest_cpu)
local_irq_restore(flags); local_irq_restore(flags);
} }
typedef struct {
int cpu;
struct completion startup_done;
task_t *task;
} migration_startup_t;
/* /*
* migration_thread - this is a highprio system thread that performs * migration_thread - this is a highprio system thread that performs
* thread migration by bumping thread off CPU then 'pushing' onto * thread migration by bumping thread off CPU then 'pushing' onto
...@@ -2764,27 +2759,17 @@ static int migration_thread(void * data) ...@@ -2764,27 +2759,17 @@ static int migration_thread(void * data)
{ {
/* Marking "param" __user is ok, since we do a set_fs(KERNEL_DS); */ /* Marking "param" __user is ok, since we do a set_fs(KERNEL_DS); */
struct sched_param __user param = { .sched_priority = MAX_RT_PRIO-1 }; struct sched_param __user param = { .sched_priority = MAX_RT_PRIO-1 };
migration_startup_t *startup = data;
int cpu = startup->cpu;
runqueue_t *rq; runqueue_t *rq;
int cpu = (long)data;
int ret; int ret;
startup->task = current;
complete(&startup->startup_done);
set_current_state(TASK_UNINTERRUPTIBLE);
schedule();
BUG_ON(smp_processor_id() != cpu); BUG_ON(smp_processor_id() != cpu);
daemonize("migration/%d", cpu);
set_fs(KERNEL_DS);
ret = setscheduler(0, SCHED_FIFO, &param); ret = setscheduler(0, SCHED_FIFO, &param);
rq = this_rq(); rq = this_rq();
rq->migration_thread = current; BUG_ON(rq->migration_thread != current);
for (;;) { while (!kthread_should_stop()) {
struct list_head *head; struct list_head *head;
migration_req_t *req; migration_req_t *req;
...@@ -2807,6 +2792,7 @@ static int migration_thread(void * data) ...@@ -2807,6 +2792,7 @@ static int migration_thread(void * data)
any_online_cpu(req->task->cpus_allowed)); any_online_cpu(req->task->cpus_allowed));
complete(&req->done); complete(&req->done);
} }
return 0;
} }
/* /*
...@@ -2816,37 +2802,27 @@ static int migration_thread(void * data) ...@@ -2816,37 +2802,27 @@ static int migration_thread(void * data)
static int migration_call(struct notifier_block *nfb, unsigned long action, static int migration_call(struct notifier_block *nfb, unsigned long action,
void *hcpu) void *hcpu)
{ {
long cpu = (long)hcpu; int cpu = (long)hcpu;
migration_startup_t startup; struct task_struct *p;
switch (action) { switch (action) {
case CPU_ONLINE: case CPU_ONLINE:
p = kthread_create(migration_thread, hcpu, "migration/%d",cpu);
printk("Starting migration thread for cpu %li\n", cpu); if (IS_ERR(p))
return NOTIFY_BAD;
startup.cpu = cpu; kthread_bind(p, cpu);
startup.task = NULL; cpu_rq(cpu)->migration_thread = p;
init_completion(&startup.startup_done); wake_up_process(p);
kernel_thread(migration_thread, &startup, CLONE_KERNEL);
wait_for_completion(&startup.startup_done);
wait_task_inactive(startup.task);
startup.task->thread_info->cpu = cpu;
startup.task->cpus_allowed = cpumask_of_cpu(cpu);
wake_up_process(startup.task);
while (!cpu_rq(cpu)->migration_thread)
yield();
break; break;
} }
return NOTIFY_OK; return NOTIFY_OK;
} }
static struct notifier_block migration_notifier /* Want this before the other threads, so they can use set_cpus_allowed. */
= { .notifier_call = &migration_call }; static struct notifier_block migration_notifier = {
.notifier_call = &migration_call,
.priority = 10,
};
__init int migration_init(void) __init int migration_init(void)
{ {
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include <linux/notifier.h> #include <linux/notifier.h>
#include <linux/percpu.h> #include <linux/percpu.h>
#include <linux/cpu.h> #include <linux/cpu.h>
#include <linux/kthread.h>
/* /*
- No shared variables, all the data are CPU local. - No shared variables, all the data are CPU local.
...@@ -337,20 +338,14 @@ static int ksoftirqd(void * __bind_cpu) ...@@ -337,20 +338,14 @@ static int ksoftirqd(void * __bind_cpu)
{ {
int cpu = (int) (long) __bind_cpu; int cpu = (int) (long) __bind_cpu;
daemonize("ksoftirqd/%d", cpu);
set_user_nice(current, 19); set_user_nice(current, 19);
current->flags |= PF_IOTHREAD; current->flags |= PF_IOTHREAD;
/* Migrate to the right CPU */
set_cpus_allowed(current, cpumask_of_cpu(cpu));
BUG_ON(smp_processor_id() != cpu); BUG_ON(smp_processor_id() != cpu);
__set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
mb();
__get_cpu_var(ksoftirqd) = current;
for (;;) { while (!kthread_should_stop()) {
if (!local_softirq_pending()) if (!local_softirq_pending())
schedule(); schedule();
...@@ -363,6 +358,7 @@ static int ksoftirqd(void * __bind_cpu) ...@@ -363,6 +358,7 @@ static int ksoftirqd(void * __bind_cpu)
__set_current_state(TASK_INTERRUPTIBLE); __set_current_state(TASK_INTERRUPTIBLE);
} }
return 0;
} }
static int __devinit cpu_callback(struct notifier_block *nfb, static int __devinit cpu_callback(struct notifier_block *nfb,
...@@ -370,15 +366,17 @@ static int __devinit cpu_callback(struct notifier_block *nfb, ...@@ -370,15 +366,17 @@ static int __devinit cpu_callback(struct notifier_block *nfb,
void *hcpu) void *hcpu)
{ {
int hotcpu = (unsigned long)hcpu; int hotcpu = (unsigned long)hcpu;
struct task_struct *p;
if (action == CPU_ONLINE) { if (action == CPU_ONLINE) {
if (kernel_thread(ksoftirqd, hcpu, CLONE_KERNEL) < 0) { p = kthread_create(ksoftirqd, hcpu, "ksoftirqd/%d", hotcpu);
if (IS_ERR(p)) {
printk("ksoftirqd for %i failed\n", hotcpu); printk("ksoftirqd for %i failed\n", hotcpu);
return NOTIFY_BAD; return NOTIFY_BAD;
} }
per_cpu(ksoftirqd, hotcpu) = p;
while (!per_cpu(ksoftirqd, hotcpu)) kthread_bind(p, hotcpu);
yield(); wake_up_process(p);
} }
return NOTIFY_OK; return NOTIFY_OK;
} }
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include <linux/completion.h> #include <linux/completion.h>
#include <linux/workqueue.h> #include <linux/workqueue.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/kthread.h>
/* /*
* The per-CPU workqueue. * The per-CPU workqueue.
...@@ -45,7 +46,6 @@ struct cpu_workqueue_struct { ...@@ -45,7 +46,6 @@ struct cpu_workqueue_struct {
struct workqueue_struct *wq; struct workqueue_struct *wq;
task_t *thread; task_t *thread;
struct completion exit;
} ____cacheline_aligned; } ____cacheline_aligned;
...@@ -153,28 +153,23 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -153,28 +153,23 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
spin_unlock_irqrestore(&cwq->lock, flags); spin_unlock_irqrestore(&cwq->lock, flags);
} }
typedef struct startup_s { static int worker_thread(void *__cwq)
struct cpu_workqueue_struct *cwq;
struct completion done;
const char *name;
} startup_t;
static int worker_thread(void *__startup)
{ {
startup_t *startup = __startup; struct cpu_workqueue_struct *cwq = __cwq;
struct cpu_workqueue_struct *cwq = startup->cwq;
int cpu = cwq - cwq->wq->cpu_wq; int cpu = cwq - cwq->wq->cpu_wq;
DECLARE_WAITQUEUE(wait, current); DECLARE_WAITQUEUE(wait, current);
struct k_sigaction sa; struct k_sigaction sa;
sigset_t blocked;
daemonize("%s/%d", startup->name, cpu);
current->flags |= PF_IOTHREAD; current->flags |= PF_IOTHREAD;
cwq->thread = current;
set_user_nice(current, -10); set_user_nice(current, -10);
set_cpus_allowed(current, cpumask_of_cpu(cpu)); BUG_ON(smp_processor_id() != cpu);
complete(&startup->done); /* Block and flush all signals */
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
/* SIG_IGN makes children autoreap: see do_notify_parent(). */ /* SIG_IGN makes children autoreap: see do_notify_parent(). */
sa.sa.sa_handler = SIG_IGN; sa.sa.sa_handler = SIG_IGN;
...@@ -182,12 +177,10 @@ static int worker_thread(void *__startup) ...@@ -182,12 +177,10 @@ static int worker_thread(void *__startup)
siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
for (;;) { while (!kthread_should_stop()) {
set_task_state(current, TASK_INTERRUPTIBLE); set_task_state(current, TASK_INTERRUPTIBLE);
add_wait_queue(&cwq->more_work, &wait); add_wait_queue(&cwq->more_work, &wait);
if (!cwq->thread)
break;
if (list_empty(&cwq->worklist)) if (list_empty(&cwq->worklist))
schedule(); schedule();
else else
...@@ -197,9 +190,6 @@ static int worker_thread(void *__startup) ...@@ -197,9 +190,6 @@ static int worker_thread(void *__startup)
if (!list_empty(&cwq->worklist)) if (!list_empty(&cwq->worklist))
run_workqueue(cwq); run_workqueue(cwq);
} }
remove_wait_queue(&cwq->more_work, &wait);
complete(&cwq->exit);
return 0; return 0;
} }
...@@ -251,9 +241,8 @@ static int create_workqueue_thread(struct workqueue_struct *wq, ...@@ -251,9 +241,8 @@ static int create_workqueue_thread(struct workqueue_struct *wq,
const char *name, const char *name,
int cpu) int cpu)
{ {
startup_t startup;
struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu; struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
int ret; struct task_struct *p;
spin_lock_init(&cwq->lock); spin_lock_init(&cwq->lock);
cwq->wq = wq; cwq->wq = wq;
...@@ -263,17 +252,13 @@ static int create_workqueue_thread(struct workqueue_struct *wq, ...@@ -263,17 +252,13 @@ static int create_workqueue_thread(struct workqueue_struct *wq,
INIT_LIST_HEAD(&cwq->worklist); INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done); init_waitqueue_head(&cwq->work_done);
init_completion(&cwq->exit);
p = kthread_create(worker_thread, cwq, "%s/%d", name, cpu);
init_completion(&startup.done); if (IS_ERR(p))
startup.cwq = cwq; return PTR_ERR(p);
startup.name = name; cwq->thread = p;
ret = kernel_thread(worker_thread, &startup, CLONE_FS | CLONE_FILES); kthread_bind(p, cpu);
if (ret >= 0) { return 0;
wait_for_completion(&startup.done);
BUG_ON(!cwq->thread);
}
return ret;
} }
struct workqueue_struct *create_workqueue(const char *name) struct workqueue_struct *create_workqueue(const char *name)
...@@ -292,6 +277,8 @@ struct workqueue_struct *create_workqueue(const char *name) ...@@ -292,6 +277,8 @@ struct workqueue_struct *create_workqueue(const char *name)
continue; continue;
if (create_workqueue_thread(wq, name, cpu) < 0) if (create_workqueue_thread(wq, name, cpu) < 0)
destroy = 1; destroy = 1;
else
wake_up_process(wq->cpu_wq[cpu].thread);
} }
/* /*
* Was there any error during startup? If yes then clean up: * Was there any error during startup? If yes then clean up:
...@@ -308,13 +295,8 @@ static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) ...@@ -308,13 +295,8 @@ static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
struct cpu_workqueue_struct *cwq; struct cpu_workqueue_struct *cwq;
cwq = wq->cpu_wq + cpu; cwq = wq->cpu_wq + cpu;
if (cwq->thread) { if (cwq->thread)
/* Tell thread to exit and wait for it. */ kthread_stop(cwq->thread);
cwq->thread = NULL;
wake_up(&cwq->more_work);
wait_for_completion(&cwq->exit);
}
} }
void destroy_workqueue(struct workqueue_struct *wq) void destroy_workqueue(struct workqueue_struct *wq)
...@@ -347,6 +329,11 @@ void flush_scheduled_work(void) ...@@ -347,6 +329,11 @@ void flush_scheduled_work(void)
flush_workqueue(keventd_wq); flush_workqueue(keventd_wq);
} }
int keventd_up(void)
{
return keventd_wq != NULL;
}
int current_is_keventd(void) int current_is_keventd(void)
{ {
struct cpu_workqueue_struct *cwq; struct cpu_workqueue_struct *cwq;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment