Commit 45141eea authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-4.1' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq

Pull workqueue updates from Tejun Heo:
 "Workqueue now prints debug information at the end of sysrq-t which
  should be helpful when tracking down suspected workqueue stalls.  It
  only prints out the ones with something currently going on so it
  shouldn't add much output in most cases"

* 'for-4.1' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq:
  workqueue: Reorder sysfs code
  percpu: Fix trivial typos in comments
  workqueue: dump workqueues on sysrq-t
  workqueue: keep track of the flushing task and pool manager
  workqueue: make the workqueues list RCU walkable
parents 8954672d 6ba94429
...@@ -275,6 +275,7 @@ static struct sysrq_key_op sysrq_showregs_op = { ...@@ -275,6 +275,7 @@ static struct sysrq_key_op sysrq_showregs_op = {
static void sysrq_handle_showstate(int key) static void sysrq_handle_showstate(int key)
{ {
show_state(); show_state();
show_workqueue_state();
} }
static struct sysrq_key_op sysrq_showstate_op = { static struct sysrq_key_op sysrq_showstate_op = {
.handler = sysrq_handle_showstate, .handler = sysrq_handle_showstate,
......
...@@ -454,6 +454,7 @@ extern bool workqueue_congested(int cpu, struct workqueue_struct *wq); ...@@ -454,6 +454,7 @@ extern bool workqueue_congested(int cpu, struct workqueue_struct *wq);
extern unsigned int work_busy(struct work_struct *work); extern unsigned int work_busy(struct work_struct *work);
extern __printf(1, 2) void set_worker_desc(const char *fmt, ...); extern __printf(1, 2) void set_worker_desc(const char *fmt, ...);
extern void print_worker_info(const char *log_lvl, struct task_struct *task); extern void print_worker_info(const char *log_lvl, struct task_struct *task);
extern void show_workqueue_state(void);
/** /**
* queue_work - queue work on a workqueue * queue_work - queue work on a workqueue
......
...@@ -159,6 +159,7 @@ struct worker_pool { ...@@ -159,6 +159,7 @@ struct worker_pool {
/* see manage_workers() for details on the two manager mutexes */ /* see manage_workers() for details on the two manager mutexes */
struct mutex manager_arb; /* manager arbitration */ struct mutex manager_arb; /* manager arbitration */
struct worker *manager; /* L: purely informational */
struct mutex attach_mutex; /* attach/detach exclusion */ struct mutex attach_mutex; /* attach/detach exclusion */
struct list_head workers; /* A: attached workers */ struct list_head workers; /* A: attached workers */
struct completion *detach_completion; /* all workers detached */ struct completion *detach_completion; /* all workers detached */
...@@ -230,7 +231,7 @@ struct wq_device; ...@@ -230,7 +231,7 @@ struct wq_device;
*/ */
struct workqueue_struct { struct workqueue_struct {
struct list_head pwqs; /* WR: all pwqs of this wq */ struct list_head pwqs; /* WR: all pwqs of this wq */
struct list_head list; /* PL: list of all workqueues */ struct list_head list; /* PR: list of all workqueues */
struct mutex mutex; /* protects this wq */ struct mutex mutex; /* protects this wq */
int work_color; /* WQ: current work color */ int work_color; /* WQ: current work color */
...@@ -257,6 +258,13 @@ struct workqueue_struct { ...@@ -257,6 +258,13 @@ struct workqueue_struct {
#endif #endif
char name[WQ_NAME_LEN]; /* I: workqueue name */ char name[WQ_NAME_LEN]; /* I: workqueue name */
/*
* Destruction of workqueue_struct is sched-RCU protected to allow
* walking the workqueues list without grabbing wq_pool_mutex.
* This is used to dump all workqueues from sysrq.
*/
struct rcu_head rcu;
/* hot fields used during command issue, aligned to cacheline */ /* hot fields used during command issue, aligned to cacheline */
unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */ unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */ struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
...@@ -288,7 +296,7 @@ static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf; ...@@ -288,7 +296,7 @@ static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf;
static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */ static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */
static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */ static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
static LIST_HEAD(workqueues); /* PL: list of all workqueues */ static LIST_HEAD(workqueues); /* PR: list of all workqueues */
static bool workqueue_freezing; /* PL: have wqs started freezing? */ static bool workqueue_freezing; /* PL: have wqs started freezing? */
/* the per-cpu worker pools */ /* the per-cpu worker pools */
...@@ -324,6 +332,7 @@ EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq); ...@@ -324,6 +332,7 @@ EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
static int worker_thread(void *__worker); static int worker_thread(void *__worker);
static void copy_workqueue_attrs(struct workqueue_attrs *to, static void copy_workqueue_attrs(struct workqueue_attrs *to,
const struct workqueue_attrs *from); const struct workqueue_attrs *from);
static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
#define CREATE_TRACE_POINTS #define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h> #include <trace/events/workqueue.h>
...@@ -1911,9 +1920,11 @@ static bool manage_workers(struct worker *worker) ...@@ -1911,9 +1920,11 @@ static bool manage_workers(struct worker *worker)
*/ */
if (!mutex_trylock(&pool->manager_arb)) if (!mutex_trylock(&pool->manager_arb))
return false; return false;
pool->manager = worker;
maybe_create_worker(pool); maybe_create_worker(pool);
pool->manager = NULL;
mutex_unlock(&pool->manager_arb); mutex_unlock(&pool->manager_arb);
return true; return true;
} }
...@@ -2303,6 +2314,7 @@ static int rescuer_thread(void *__rescuer) ...@@ -2303,6 +2314,7 @@ static int rescuer_thread(void *__rescuer)
struct wq_barrier { struct wq_barrier {
struct work_struct work; struct work_struct work;
struct completion done; struct completion done;
struct task_struct *task; /* purely informational */
}; };
static void wq_barrier_func(struct work_struct *work) static void wq_barrier_func(struct work_struct *work)
...@@ -2351,6 +2363,7 @@ static void insert_wq_barrier(struct pool_workqueue *pwq, ...@@ -2351,6 +2363,7 @@ static void insert_wq_barrier(struct pool_workqueue *pwq,
INIT_WORK_ONSTACK(&barr->work, wq_barrier_func); INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work)); __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion(&barr->done); init_completion(&barr->done);
barr->task = current;
/* /*
* If @target is currently being executed, schedule the * If @target is currently being executed, schedule the
...@@ -2989,624 +3002,319 @@ int execute_in_process_context(work_func_t fn, struct execute_work *ew) ...@@ -2989,624 +3002,319 @@ int execute_in_process_context(work_func_t fn, struct execute_work *ew)
} }
EXPORT_SYMBOL_GPL(execute_in_process_context); EXPORT_SYMBOL_GPL(execute_in_process_context);
#ifdef CONFIG_SYSFS /**
/* * free_workqueue_attrs - free a workqueue_attrs
* Workqueues with WQ_SYSFS flag set is visible to userland via * @attrs: workqueue_attrs to free
* /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
* following attributes.
*
* per_cpu RO bool : whether the workqueue is per-cpu or unbound
* max_active RW int : maximum number of in-flight work items
*
* Unbound workqueues have the following extra attributes.
* *
* id RO int : the associated pool ID * Undo alloc_workqueue_attrs().
* nice RW int : nice value of the workers
* cpumask RW mask : bitmask of allowed CPUs for the workers
*/ */
struct wq_device { void free_workqueue_attrs(struct workqueue_attrs *attrs)
struct workqueue_struct *wq;
struct device dev;
};
static struct workqueue_struct *dev_to_wq(struct device *dev)
{ {
struct wq_device *wq_dev = container_of(dev, struct wq_device, dev); if (attrs) {
free_cpumask_var(attrs->cpumask);
return wq_dev->wq; kfree(attrs);
}
} }
static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr, /**
char *buf) * alloc_workqueue_attrs - allocate a workqueue_attrs
* @gfp_mask: allocation mask to use
*
* Allocate a new workqueue_attrs, initialize with default settings and
* return it.
*
* Return: The allocated new workqueue_attr on success. %NULL on failure.
*/
struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); struct workqueue_attrs *attrs;
return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND)); attrs = kzalloc(sizeof(*attrs), gfp_mask);
if (!attrs)
goto fail;
if (!alloc_cpumask_var(&attrs->cpumask, gfp_mask))
goto fail;
cpumask_copy(attrs->cpumask, cpu_possible_mask);
return attrs;
fail:
free_workqueue_attrs(attrs);
return NULL;
} }
static DEVICE_ATTR_RO(per_cpu);
static ssize_t max_active_show(struct device *dev, static void copy_workqueue_attrs(struct workqueue_attrs *to,
struct device_attribute *attr, char *buf) const struct workqueue_attrs *from)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); to->nice = from->nice;
cpumask_copy(to->cpumask, from->cpumask);
return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active); /*
* Unlike hash and equality test, this function doesn't ignore
* ->no_numa as it is used for both pool and wq attrs. Instead,
* get_unbound_pool() explicitly clears ->no_numa after copying.
*/
to->no_numa = from->no_numa;
} }
static ssize_t max_active_store(struct device *dev, /* hash value of the content of @attr */
struct device_attribute *attr, const char *buf, static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
size_t count)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); u32 hash = 0;
int val;
if (sscanf(buf, "%d", &val) != 1 || val <= 0)
return -EINVAL;
workqueue_set_max_active(wq, val); hash = jhash_1word(attrs->nice, hash);
return count; hash = jhash(cpumask_bits(attrs->cpumask),
BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
return hash;
} }
static DEVICE_ATTR_RW(max_active);
static struct attribute *wq_sysfs_attrs[] = {
&dev_attr_per_cpu.attr,
&dev_attr_max_active.attr,
NULL,
};
ATTRIBUTE_GROUPS(wq_sysfs);
static ssize_t wq_pool_ids_show(struct device *dev, /* content equality test */
struct device_attribute *attr, char *buf) static bool wqattrs_equal(const struct workqueue_attrs *a,
const struct workqueue_attrs *b)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); if (a->nice != b->nice)
const char *delim = ""; return false;
int node, written = 0; if (!cpumask_equal(a->cpumask, b->cpumask))
return false;
rcu_read_lock_sched(); return true;
for_each_node(node) {
written += scnprintf(buf + written, PAGE_SIZE - written,
"%s%d:%d", delim, node,
unbound_pwq_by_node(wq, node)->pool->id);
delim = " ";
}
written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
rcu_read_unlock_sched();
return written;
} }
static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr, /**
char *buf) * init_worker_pool - initialize a newly zalloc'd worker_pool
* @pool: worker_pool to initialize
*
* Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs.
*
* Return: 0 on success, -errno on failure. Even on failure, all fields
* inside @pool proper are initialized and put_unbound_pool() can be called
* on @pool safely to release it.
*/
static int init_worker_pool(struct worker_pool *pool)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); spin_lock_init(&pool->lock);
int written; pool->id = -1;
pool->cpu = -1;
pool->node = NUMA_NO_NODE;
pool->flags |= POOL_DISASSOCIATED;
INIT_LIST_HEAD(&pool->worklist);
INIT_LIST_HEAD(&pool->idle_list);
hash_init(pool->busy_hash);
mutex_lock(&wq->mutex); init_timer_deferrable(&pool->idle_timer);
written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice); pool->idle_timer.function = idle_worker_timeout;
mutex_unlock(&wq->mutex); pool->idle_timer.data = (unsigned long)pool;
return written; setup_timer(&pool->mayday_timer, pool_mayday_timeout,
} (unsigned long)pool);
/* prepare workqueue_attrs for sysfs store operations */ mutex_init(&pool->manager_arb);
static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq) mutex_init(&pool->attach_mutex);
{ INIT_LIST_HEAD(&pool->workers);
struct workqueue_attrs *attrs;
attrs = alloc_workqueue_attrs(GFP_KERNEL); ida_init(&pool->worker_ida);
if (!attrs) INIT_HLIST_NODE(&pool->hash_node);
return NULL; pool->refcnt = 1;
mutex_lock(&wq->mutex); /* shouldn't fail above this point */
copy_workqueue_attrs(attrs, wq->unbound_attrs); pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
mutex_unlock(&wq->mutex); if (!pool->attrs)
return attrs; return -ENOMEM;
return 0;
} }
static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr, static void rcu_free_wq(struct rcu_head *rcu)
const char *buf, size_t count)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); struct workqueue_struct *wq =
struct workqueue_attrs *attrs; container_of(rcu, struct workqueue_struct, rcu);
int ret;
attrs = wq_sysfs_prep_attrs(wq);
if (!attrs)
return -ENOMEM;
if (sscanf(buf, "%d", &attrs->nice) == 1 && if (!(wq->flags & WQ_UNBOUND))
attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE) free_percpu(wq->cpu_pwqs);
ret = apply_workqueue_attrs(wq, attrs);
else else
ret = -EINVAL; free_workqueue_attrs(wq->unbound_attrs);
free_workqueue_attrs(attrs); kfree(wq->rescuer);
return ret ?: count; kfree(wq);
} }
static ssize_t wq_cpumask_show(struct device *dev, static void rcu_free_pool(struct rcu_head *rcu)
struct device_attribute *attr, char *buf)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
int written;
mutex_lock(&wq->mutex); ida_destroy(&pool->worker_ida);
written = scnprintf(buf, PAGE_SIZE, "%*pb\n", free_workqueue_attrs(pool->attrs);
cpumask_pr_args(wq->unbound_attrs->cpumask)); kfree(pool);
mutex_unlock(&wq->mutex);
return written;
} }
static ssize_t wq_cpumask_store(struct device *dev, /**
struct device_attribute *attr, * put_unbound_pool - put a worker_pool
const char *buf, size_t count) * @pool: worker_pool to put
*
* Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
* safe manner. get_unbound_pool() calls this function on its failure path
* and this function should be able to release pools which went through,
* successfully or not, init_worker_pool().
*
* Should be called with wq_pool_mutex held.
*/
static void put_unbound_pool(struct worker_pool *pool)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); DECLARE_COMPLETION_ONSTACK(detach_completion);
struct workqueue_attrs *attrs; struct worker *worker;
int ret;
attrs = wq_sysfs_prep_attrs(wq); lockdep_assert_held(&wq_pool_mutex);
if (!attrs)
return -ENOMEM;
ret = cpumask_parse(buf, attrs->cpumask); if (--pool->refcnt)
if (!ret) return;
ret = apply_workqueue_attrs(wq, attrs);
free_workqueue_attrs(attrs); /* sanity checks */
return ret ?: count; if (WARN_ON(!(pool->cpu < 0)) ||
} WARN_ON(!list_empty(&pool->worklist)))
return;
static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr, /* release id and unhash */
char *buf) if (pool->id >= 0)
{ idr_remove(&worker_pool_idr, pool->id);
struct workqueue_struct *wq = dev_to_wq(dev); hash_del(&pool->hash_node);
int written;
mutex_lock(&wq->mutex);
written = scnprintf(buf, PAGE_SIZE, "%d\n",
!wq->unbound_attrs->no_numa);
mutex_unlock(&wq->mutex);
return written;
}
static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
const char *buf, size_t count)
{
struct workqueue_struct *wq = dev_to_wq(dev);
struct workqueue_attrs *attrs;
int v, ret;
attrs = wq_sysfs_prep_attrs(wq);
if (!attrs)
return -ENOMEM;
ret = -EINVAL; /*
if (sscanf(buf, "%d", &v) == 1) { * Become the manager and destroy all workers. Grabbing
attrs->no_numa = !v; * manager_arb prevents @pool's workers from blocking on
ret = apply_workqueue_attrs(wq, attrs); * attach_mutex.
} */
mutex_lock(&pool->manager_arb);
free_workqueue_attrs(attrs); spin_lock_irq(&pool->lock);
return ret ?: count; while ((worker = first_idle_worker(pool)))
} destroy_worker(worker);
WARN_ON(pool->nr_workers || pool->nr_idle);
spin_unlock_irq(&pool->lock);
static struct device_attribute wq_sysfs_unbound_attrs[] = { mutex_lock(&pool->attach_mutex);
__ATTR(pool_ids, 0444, wq_pool_ids_show, NULL), if (!list_empty(&pool->workers))
__ATTR(nice, 0644, wq_nice_show, wq_nice_store), pool->detach_completion = &detach_completion;
__ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store), mutex_unlock(&pool->attach_mutex);
__ATTR(numa, 0644, wq_numa_show, wq_numa_store),
__ATTR_NULL,
};
static struct bus_type wq_subsys = { if (pool->detach_completion)
.name = "workqueue", wait_for_completion(pool->detach_completion);
.dev_groups = wq_sysfs_groups,
};
static int __init wq_sysfs_init(void) mutex_unlock(&pool->manager_arb);
{
return subsys_virtual_register(&wq_subsys, NULL);
}
core_initcall(wq_sysfs_init);
static void wq_device_release(struct device *dev) /* shut down the timers */
{ del_timer_sync(&pool->idle_timer);
struct wq_device *wq_dev = container_of(dev, struct wq_device, dev); del_timer_sync(&pool->mayday_timer);
kfree(wq_dev); /* sched-RCU protected to allow dereferences from get_work_pool() */
call_rcu_sched(&pool->rcu, rcu_free_pool);
} }
/** /**
* workqueue_sysfs_register - make a workqueue visible in sysfs * get_unbound_pool - get a worker_pool with the specified attributes
* @wq: the workqueue to register * @attrs: the attributes of the worker_pool to get
* *
* Expose @wq in sysfs under /sys/bus/workqueue/devices. * Obtain a worker_pool which has the same attributes as @attrs, bump the
* alloc_workqueue*() automatically calls this function if WQ_SYSFS is set * reference count and return it. If there already is a matching
* which is the preferred method. * worker_pool, it will be used; otherwise, this function attempts to
* create a new one.
* *
* Workqueue user should use this function directly iff it wants to apply * Should be called with wq_pool_mutex held.
* workqueue_attrs before making the workqueue visible in sysfs; otherwise,
* apply_workqueue_attrs() may race against userland updating the
* attributes.
* *
* Return: 0 on success, -errno on failure. * Return: On success, a worker_pool with the same attributes as @attrs.
* On failure, %NULL.
*/ */
int workqueue_sysfs_register(struct workqueue_struct *wq) static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
{ {
struct wq_device *wq_dev; u32 hash = wqattrs_hash(attrs);
int ret; struct worker_pool *pool;
int node;
/* lockdep_assert_held(&wq_pool_mutex);
* Adjusting max_active or creating new pwqs by applyting
* attributes breaks ordering guarantee. Disallow exposing ordered
* workqueues.
*/
if (WARN_ON(wq->flags & __WQ_ORDERED))
return -EINVAL;
wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL); /* do we already have a matching pool? */
if (!wq_dev) hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
return -ENOMEM; if (wqattrs_equal(pool->attrs, attrs)) {
pool->refcnt++;
return pool;
}
}
wq_dev->wq = wq; /* nope, create a new one */
wq_dev->dev.bus = &wq_subsys; pool = kzalloc(sizeof(*pool), GFP_KERNEL);
wq_dev->dev.init_name = wq->name; if (!pool || init_worker_pool(pool) < 0)
wq_dev->dev.release = wq_device_release; goto fail;
lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
copy_workqueue_attrs(pool->attrs, attrs);
/* /*
* unbound_attrs are created separately. Suppress uevent until * no_numa isn't a worker_pool attribute, always clear it. See
* everything is ready. * 'struct workqueue_attrs' comments for detail.
*/ */
dev_set_uevent_suppress(&wq_dev->dev, true); pool->attrs->no_numa = false;
ret = device_register(&wq_dev->dev);
if (ret) {
kfree(wq_dev);
wq->wq_dev = NULL;
return ret;
}
if (wq->flags & WQ_UNBOUND) {
struct device_attribute *attr;
for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) { /* if cpumask is contained inside a NUMA node, we belong to that node */
ret = device_create_file(&wq_dev->dev, attr); if (wq_numa_enabled) {
if (ret) { for_each_node(node) {
device_unregister(&wq_dev->dev); if (cpumask_subset(pool->attrs->cpumask,
wq->wq_dev = NULL; wq_numa_possible_cpumask[node])) {
return ret; pool->node = node;
break;
} }
} }
} }
dev_set_uevent_suppress(&wq_dev->dev, false); if (worker_pool_assign_id(pool) < 0)
kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD); goto fail;
return 0;
}
/** /* create and start the initial worker */
* workqueue_sysfs_unregister - undo workqueue_sysfs_register() if (!create_worker(pool))
* @wq: the workqueue to unregister goto fail;
*
* If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
*/
static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
{
struct wq_device *wq_dev = wq->wq_dev;
if (!wq->wq_dev) /* install */
return; hash_add(unbound_pool_hash, &pool->hash_node, hash);
wq->wq_dev = NULL; return pool;
device_unregister(&wq_dev->dev); fail:
if (pool)
put_unbound_pool(pool);
return NULL;
} }
#else /* CONFIG_SYSFS */
static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
#endif /* CONFIG_SYSFS */
/** static void rcu_free_pwq(struct rcu_head *rcu)
* free_workqueue_attrs - free a workqueue_attrs
* @attrs: workqueue_attrs to free
*
* Undo alloc_workqueue_attrs().
*/
void free_workqueue_attrs(struct workqueue_attrs *attrs)
{ {
if (attrs) { kmem_cache_free(pwq_cache,
free_cpumask_var(attrs->cpumask); container_of(rcu, struct pool_workqueue, rcu));
kfree(attrs);
}
} }
/** /*
* alloc_workqueue_attrs - allocate a workqueue_attrs * Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
* @gfp_mask: allocation mask to use * and needs to be destroyed.
*
* Allocate a new workqueue_attrs, initialize with default settings and
* return it.
*
* Return: The allocated new workqueue_attr on success. %NULL on failure.
*/ */
struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask) static void pwq_unbound_release_workfn(struct work_struct *work)
{ {
struct workqueue_attrs *attrs; struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
unbound_release_work);
struct workqueue_struct *wq = pwq->wq;
struct worker_pool *pool = pwq->pool;
bool is_last;
attrs = kzalloc(sizeof(*attrs), gfp_mask); if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
if (!attrs) return;
goto fail;
if (!alloc_cpumask_var(&attrs->cpumask, gfp_mask))
goto fail;
cpumask_copy(attrs->cpumask, cpu_possible_mask); mutex_lock(&wq->mutex);
return attrs; list_del_rcu(&pwq->pwqs_node);
fail: is_last = list_empty(&wq->pwqs);
free_workqueue_attrs(attrs); mutex_unlock(&wq->mutex);
return NULL;
} mutex_lock(&wq_pool_mutex);
put_unbound_pool(pool);
mutex_unlock(&wq_pool_mutex);
call_rcu_sched(&pwq->rcu, rcu_free_pwq);
static void copy_workqueue_attrs(struct workqueue_attrs *to,
const struct workqueue_attrs *from)
{
to->nice = from->nice;
cpumask_copy(to->cpumask, from->cpumask);
/* /*
* Unlike hash and equality test, this function doesn't ignore * If we're the last pwq going away, @wq is already dead and no one
* ->no_numa as it is used for both pool and wq attrs. Instead, * is gonna access it anymore. Schedule RCU free.
* get_unbound_pool() explicitly clears ->no_numa after copying.
*/ */
to->no_numa = from->no_numa; if (is_last)
} call_rcu_sched(&wq->rcu, rcu_free_wq);
/* hash value of the content of @attr */
static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
{
u32 hash = 0;
hash = jhash_1word(attrs->nice, hash);
hash = jhash(cpumask_bits(attrs->cpumask),
BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
return hash;
}
/* content equality test */
static bool wqattrs_equal(const struct workqueue_attrs *a,
const struct workqueue_attrs *b)
{
if (a->nice != b->nice)
return false;
if (!cpumask_equal(a->cpumask, b->cpumask))
return false;
return true;
}
/**
* init_worker_pool - initialize a newly zalloc'd worker_pool
* @pool: worker_pool to initialize
*
* Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs.
*
* Return: 0 on success, -errno on failure. Even on failure, all fields
* inside @pool proper are initialized and put_unbound_pool() can be called
* on @pool safely to release it.
*/
static int init_worker_pool(struct worker_pool *pool)
{
spin_lock_init(&pool->lock);
pool->id = -1;
pool->cpu = -1;
pool->node = NUMA_NO_NODE;
pool->flags |= POOL_DISASSOCIATED;
INIT_LIST_HEAD(&pool->worklist);
INIT_LIST_HEAD(&pool->idle_list);
hash_init(pool->busy_hash);
init_timer_deferrable(&pool->idle_timer);
pool->idle_timer.function = idle_worker_timeout;
pool->idle_timer.data = (unsigned long)pool;
setup_timer(&pool->mayday_timer, pool_mayday_timeout,
(unsigned long)pool);
mutex_init(&pool->manager_arb);
mutex_init(&pool->attach_mutex);
INIT_LIST_HEAD(&pool->workers);
ida_init(&pool->worker_ida);
INIT_HLIST_NODE(&pool->hash_node);
pool->refcnt = 1;
/* shouldn't fail above this point */
pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!pool->attrs)
return -ENOMEM;
return 0;
}
static void rcu_free_pool(struct rcu_head *rcu)
{
struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
ida_destroy(&pool->worker_ida);
free_workqueue_attrs(pool->attrs);
kfree(pool);
}
/**
* put_unbound_pool - put a worker_pool
* @pool: worker_pool to put
*
* Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
* safe manner. get_unbound_pool() calls this function on its failure path
* and this function should be able to release pools which went through,
* successfully or not, init_worker_pool().
*
* Should be called with wq_pool_mutex held.
*/
static void put_unbound_pool(struct worker_pool *pool)
{
DECLARE_COMPLETION_ONSTACK(detach_completion);
struct worker *worker;
lockdep_assert_held(&wq_pool_mutex);
if (--pool->refcnt)
return;
/* sanity checks */
if (WARN_ON(!(pool->cpu < 0)) ||
WARN_ON(!list_empty(&pool->worklist)))
return;
/* release id and unhash */
if (pool->id >= 0)
idr_remove(&worker_pool_idr, pool->id);
hash_del(&pool->hash_node);
/*
* Become the manager and destroy all workers. Grabbing
* manager_arb prevents @pool's workers from blocking on
* attach_mutex.
*/
mutex_lock(&pool->manager_arb);
spin_lock_irq(&pool->lock);
while ((worker = first_idle_worker(pool)))
destroy_worker(worker);
WARN_ON(pool->nr_workers || pool->nr_idle);
spin_unlock_irq(&pool->lock);
mutex_lock(&pool->attach_mutex);
if (!list_empty(&pool->workers))
pool->detach_completion = &detach_completion;
mutex_unlock(&pool->attach_mutex);
if (pool->detach_completion)
wait_for_completion(pool->detach_completion);
mutex_unlock(&pool->manager_arb);
/* shut down the timers */
del_timer_sync(&pool->idle_timer);
del_timer_sync(&pool->mayday_timer);
/* sched-RCU protected to allow dereferences from get_work_pool() */
call_rcu_sched(&pool->rcu, rcu_free_pool);
}
/**
* get_unbound_pool - get a worker_pool with the specified attributes
* @attrs: the attributes of the worker_pool to get
*
* Obtain a worker_pool which has the same attributes as @attrs, bump the
* reference count and return it. If there already is a matching
* worker_pool, it will be used; otherwise, this function attempts to
* create a new one.
*
* Should be called with wq_pool_mutex held.
*
* Return: On success, a worker_pool with the same attributes as @attrs.
* On failure, %NULL.
*/
static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
{
u32 hash = wqattrs_hash(attrs);
struct worker_pool *pool;
int node;
lockdep_assert_held(&wq_pool_mutex);
/* do we already have a matching pool? */
hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
if (wqattrs_equal(pool->attrs, attrs)) {
pool->refcnt++;
return pool;
}
}
/* nope, create a new one */
pool = kzalloc(sizeof(*pool), GFP_KERNEL);
if (!pool || init_worker_pool(pool) < 0)
goto fail;
lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
copy_workqueue_attrs(pool->attrs, attrs);
/*
* no_numa isn't a worker_pool attribute, always clear it. See
* 'struct workqueue_attrs' comments for detail.
*/
pool->attrs->no_numa = false;
/* if cpumask is contained inside a NUMA node, we belong to that node */
if (wq_numa_enabled) {
for_each_node(node) {
if (cpumask_subset(pool->attrs->cpumask,
wq_numa_possible_cpumask[node])) {
pool->node = node;
break;
}
}
}
if (worker_pool_assign_id(pool) < 0)
goto fail;
/* create and start the initial worker */
if (!create_worker(pool))
goto fail;
/* install */
hash_add(unbound_pool_hash, &pool->hash_node, hash);
return pool;
fail:
if (pool)
put_unbound_pool(pool);
return NULL;
}
static void rcu_free_pwq(struct rcu_head *rcu)
{
kmem_cache_free(pwq_cache,
container_of(rcu, struct pool_workqueue, rcu));
}
/*
* Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
* and needs to be destroyed.
*/
static void pwq_unbound_release_workfn(struct work_struct *work)
{
struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
unbound_release_work);
struct workqueue_struct *wq = pwq->wq;
struct worker_pool *pool = pwq->pool;
bool is_last;
if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
return;
mutex_lock(&wq->mutex);
list_del_rcu(&pwq->pwqs_node);
is_last = list_empty(&wq->pwqs);
mutex_unlock(&wq->mutex);
mutex_lock(&wq_pool_mutex);
put_unbound_pool(pool);
mutex_unlock(&wq_pool_mutex);
call_rcu_sched(&pwq->rcu, rcu_free_pwq);
/*
* If we're the last pwq going away, @wq is already dead and no one
* is gonna access it anymore. Free it.
*/
if (is_last) {
free_workqueue_attrs(wq->unbound_attrs);
kfree(wq);
}
} }
/** /**
...@@ -4143,7 +3851,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt, ...@@ -4143,7 +3851,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
pwq_adjust_max_active(pwq); pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex); mutex_unlock(&wq->mutex);
list_add(&wq->list, &workqueues); list_add_tail_rcu(&wq->list, &workqueues);
mutex_unlock(&wq_pool_mutex); mutex_unlock(&wq_pool_mutex);
...@@ -4199,24 +3907,20 @@ void destroy_workqueue(struct workqueue_struct *wq) ...@@ -4199,24 +3907,20 @@ void destroy_workqueue(struct workqueue_struct *wq)
* flushing is complete in case freeze races us. * flushing is complete in case freeze races us.
*/ */
mutex_lock(&wq_pool_mutex); mutex_lock(&wq_pool_mutex);
list_del_init(&wq->list); list_del_rcu(&wq->list);
mutex_unlock(&wq_pool_mutex); mutex_unlock(&wq_pool_mutex);
workqueue_sysfs_unregister(wq); workqueue_sysfs_unregister(wq);
if (wq->rescuer) { if (wq->rescuer)
kthread_stop(wq->rescuer->task); kthread_stop(wq->rescuer->task);
kfree(wq->rescuer);
wq->rescuer = NULL;
}
if (!(wq->flags & WQ_UNBOUND)) { if (!(wq->flags & WQ_UNBOUND)) {
/* /*
* The base ref is never dropped on per-cpu pwqs. Directly * The base ref is never dropped on per-cpu pwqs. Directly
* free the pwqs and wq. * schedule RCU free.
*/ */
free_percpu(wq->cpu_pwqs); call_rcu_sched(&wq->rcu, rcu_free_wq);
kfree(wq);
} else { } else {
/* /*
* We're the sole accessor of @wq at this point. Directly * We're the sole accessor of @wq at this point. Directly
...@@ -4437,13 +4141,173 @@ void print_worker_info(const char *log_lvl, struct task_struct *task) ...@@ -4437,13 +4141,173 @@ void print_worker_info(const char *log_lvl, struct task_struct *task)
} }
} }
/* static void pr_cont_pool_info(struct worker_pool *pool)
* CPU hotplug. {
* pr_cont(" cpus=%*pbl", nr_cpumask_bits, pool->attrs->cpumask);
* There are two challenges in supporting CPU hotplug. Firstly, there if (pool->node != NUMA_NO_NODE)
* are a lot of assumptions on strong associations among work, pwq and pr_cont(" node=%d", pool->node);
* pool which make migrating pending and scheduled works very pr_cont(" flags=0x%x nice=%d", pool->flags, pool->attrs->nice);
* difficult to implement without impacting hot paths. Secondly, }
static void pr_cont_work(bool comma, struct work_struct *work)
{
if (work->func == wq_barrier_func) {
struct wq_barrier *barr;
barr = container_of(work, struct wq_barrier, work);
pr_cont("%s BAR(%d)", comma ? "," : "",
task_pid_nr(barr->task));
} else {
pr_cont("%s %pf", comma ? "," : "", work->func);
}
}
static void show_pwq(struct pool_workqueue *pwq)
{
struct worker_pool *pool = pwq->pool;
struct work_struct *work;
struct worker *worker;
bool has_in_flight = false, has_pending = false;
int bkt;
pr_info(" pwq %d:", pool->id);
pr_cont_pool_info(pool);
pr_cont(" active=%d/%d%s\n", pwq->nr_active, pwq->max_active,
!list_empty(&pwq->mayday_node) ? " MAYDAY" : "");
hash_for_each(pool->busy_hash, bkt, worker, hentry) {
if (worker->current_pwq == pwq) {
has_in_flight = true;
break;
}
}
if (has_in_flight) {
bool comma = false;
pr_info(" in-flight:");
hash_for_each(pool->busy_hash, bkt, worker, hentry) {
if (worker->current_pwq != pwq)
continue;
pr_cont("%s %d%s:%pf", comma ? "," : "",
task_pid_nr(worker->task),
worker == pwq->wq->rescuer ? "(RESCUER)" : "",
worker->current_func);
list_for_each_entry(work, &worker->scheduled, entry)
pr_cont_work(false, work);
comma = true;
}
pr_cont("\n");
}
list_for_each_entry(work, &pool->worklist, entry) {
if (get_work_pwq(work) == pwq) {
has_pending = true;
break;
}
}
if (has_pending) {
bool comma = false;
pr_info(" pending:");
list_for_each_entry(work, &pool->worklist, entry) {
if (get_work_pwq(work) != pwq)
continue;
pr_cont_work(comma, work);
comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
}
pr_cont("\n");
}
if (!list_empty(&pwq->delayed_works)) {
bool comma = false;
pr_info(" delayed:");
list_for_each_entry(work, &pwq->delayed_works, entry) {
pr_cont_work(comma, work);
comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
}
pr_cont("\n");
}
}
/**
* show_workqueue_state - dump workqueue state
*
* Called from a sysrq handler and prints out all busy workqueues and
* pools.
*/
void show_workqueue_state(void)
{
struct workqueue_struct *wq;
struct worker_pool *pool;
unsigned long flags;
int pi;
rcu_read_lock_sched();
pr_info("Showing busy workqueues and worker pools:\n");
list_for_each_entry_rcu(wq, &workqueues, list) {
struct pool_workqueue *pwq;
bool idle = true;
for_each_pwq(pwq, wq) {
if (pwq->nr_active || !list_empty(&pwq->delayed_works)) {
idle = false;
break;
}
}
if (idle)
continue;
pr_info("workqueue %s: flags=0x%x\n", wq->name, wq->flags);
for_each_pwq(pwq, wq) {
spin_lock_irqsave(&pwq->pool->lock, flags);
if (pwq->nr_active || !list_empty(&pwq->delayed_works))
show_pwq(pwq);
spin_unlock_irqrestore(&pwq->pool->lock, flags);
}
}
for_each_pool(pool, pi) {
struct worker *worker;
bool first = true;
spin_lock_irqsave(&pool->lock, flags);
if (pool->nr_workers == pool->nr_idle)
goto next_pool;
pr_info("pool %d:", pool->id);
pr_cont_pool_info(pool);
pr_cont(" workers=%d", pool->nr_workers);
if (pool->manager)
pr_cont(" manager: %d",
task_pid_nr(pool->manager->task));
list_for_each_entry(worker, &pool->idle_list, entry) {
pr_cont(" %s%d", first ? "idle: " : "",
task_pid_nr(worker->task));
first = false;
}
pr_cont("\n");
next_pool:
spin_unlock_irqrestore(&pool->lock, flags);
}
rcu_read_unlock_sched();
}
/*
* CPU hotplug.
*
* There are two challenges in supporting CPU hotplug. Firstly, there
* are a lot of assumptions on strong associations among work, pwq and
* pool which make migrating pending and scheduled works very
* difficult to implement without impacting hot paths. Secondly,
* worker pools serve mix of short, long and very long running works making * worker pools serve mix of short, long and very long running works making
* blocked draining impractical. * blocked draining impractical.
* *
...@@ -4637,202 +4501,519 @@ static int workqueue_cpu_up_callback(struct notifier_block *nfb, ...@@ -4637,202 +4501,519 @@ static int workqueue_cpu_up_callback(struct notifier_block *nfb,
else if (pool->cpu < 0) else if (pool->cpu < 0)
restore_unbound_workers_cpumask(pool, cpu); restore_unbound_workers_cpumask(pool, cpu);
mutex_unlock(&pool->attach_mutex); mutex_unlock(&pool->attach_mutex);
} }
/* update NUMA affinity of unbound workqueues */
list_for_each_entry(wq, &workqueues, list)
wq_update_unbound_numa(wq, cpu, true);
mutex_unlock(&wq_pool_mutex);
break;
}
return NOTIFY_OK;
}
/*
* Workqueues should be brought down after normal priority CPU notifiers.
* This will be registered as low priority CPU notifier.
*/
static int workqueue_cpu_down_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
int cpu = (unsigned long)hcpu;
struct work_struct unbind_work;
struct workqueue_struct *wq;
switch (action & ~CPU_TASKS_FROZEN) {
case CPU_DOWN_PREPARE:
/* unbinding per-cpu workers should happen on the local CPU */
INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
queue_work_on(cpu, system_highpri_wq, &unbind_work);
/* update NUMA affinity of unbound workqueues */
mutex_lock(&wq_pool_mutex);
list_for_each_entry(wq, &workqueues, list)
wq_update_unbound_numa(wq, cpu, false);
mutex_unlock(&wq_pool_mutex);
/* wait for per-cpu unbinding to finish */
flush_work(&unbind_work);
destroy_work_on_stack(&unbind_work);
break;
}
return NOTIFY_OK;
}
#ifdef CONFIG_SMP
struct work_for_cpu {
struct work_struct work;
long (*fn)(void *);
void *arg;
long ret;
};
static void work_for_cpu_fn(struct work_struct *work)
{
struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work);
wfc->ret = wfc->fn(wfc->arg);
}
/**
* work_on_cpu - run a function in user context on a particular cpu
* @cpu: the cpu to run on
* @fn: the function to run
* @arg: the function arg
*
* It is up to the caller to ensure that the cpu doesn't go offline.
* The caller must not hold any locks which would prevent @fn from completing.
*
* Return: The value @fn returns.
*/
long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
{
struct work_for_cpu wfc = { .fn = fn, .arg = arg };
INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn);
schedule_work_on(cpu, &wfc.work);
flush_work(&wfc.work);
destroy_work_on_stack(&wfc.work);
return wfc.ret;
}
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */
#ifdef CONFIG_FREEZER
/**
* freeze_workqueues_begin - begin freezing workqueues
*
* Start freezing workqueues. After this function returns, all freezable
* workqueues will queue new works to their delayed_works list instead of
* pool->worklist.
*
* CONTEXT:
* Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
*/
void freeze_workqueues_begin(void)
{
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
WARN_ON_ONCE(workqueue_freezing);
workqueue_freezing = true;
list_for_each_entry(wq, &workqueues, list) {
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
}
mutex_unlock(&wq_pool_mutex);
}
/**
* freeze_workqueues_busy - are freezable workqueues still busy?
*
* Check whether freezing is complete. This function must be called
* between freeze_workqueues_begin() and thaw_workqueues().
*
* CONTEXT:
* Grabs and releases wq_pool_mutex.
*
* Return:
* %true if some freezable workqueues are still busy. %false if freezing
* is complete.
*/
bool freeze_workqueues_busy(void)
{
bool busy = false;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
WARN_ON_ONCE(!workqueue_freezing);
list_for_each_entry(wq, &workqueues, list) {
if (!(wq->flags & WQ_FREEZABLE))
continue;
/*
* nr_active is monotonically decreasing. It's safe
* to peek without lock.
*/
rcu_read_lock_sched();
for_each_pwq(pwq, wq) {
WARN_ON_ONCE(pwq->nr_active < 0);
if (pwq->nr_active) {
busy = true;
rcu_read_unlock_sched();
goto out_unlock;
}
}
rcu_read_unlock_sched();
}
out_unlock:
mutex_unlock(&wq_pool_mutex);
return busy;
}
/**
* thaw_workqueues - thaw workqueues
*
* Thaw workqueues. Normal queueing is restored and all collected
* frozen works are transferred to their respective pool worklists.
*
* CONTEXT:
* Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
*/
void thaw_workqueues(void)
{
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
if (!workqueue_freezing)
goto out_unlock;
workqueue_freezing = false;
/* restore max_active and repopulate worklist */
list_for_each_entry(wq, &workqueues, list) {
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
}
out_unlock:
mutex_unlock(&wq_pool_mutex);
}
#endif /* CONFIG_FREEZER */
#ifdef CONFIG_SYSFS
/*
* Workqueues with WQ_SYSFS flag set is visible to userland via
* /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
* following attributes.
*
* per_cpu RO bool : whether the workqueue is per-cpu or unbound
* max_active RW int : maximum number of in-flight work items
*
* Unbound workqueues have the following extra attributes.
*
* id RO int : the associated pool ID
* nice RW int : nice value of the workers
* cpumask RW mask : bitmask of allowed CPUs for the workers
*/
struct wq_device {
struct workqueue_struct *wq;
struct device dev;
};
static struct workqueue_struct *dev_to_wq(struct device *dev)
{
struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
return wq_dev->wq;
}
static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr,
char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
}
static DEVICE_ATTR_RO(per_cpu);
static ssize_t max_active_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
}
static ssize_t max_active_store(struct device *dev,
struct device_attribute *attr, const char *buf,
size_t count)
{
struct workqueue_struct *wq = dev_to_wq(dev);
int val;
if (sscanf(buf, "%d", &val) != 1 || val <= 0)
return -EINVAL;
workqueue_set_max_active(wq, val);
return count;
}
static DEVICE_ATTR_RW(max_active);
static struct attribute *wq_sysfs_attrs[] = {
&dev_attr_per_cpu.attr,
&dev_attr_max_active.attr,
NULL,
};
ATTRIBUTE_GROUPS(wq_sysfs);
static ssize_t wq_pool_ids_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
const char *delim = "";
int node, written = 0;
rcu_read_lock_sched();
for_each_node(node) {
written += scnprintf(buf + written, PAGE_SIZE - written,
"%s%d:%d", delim, node,
unbound_pwq_by_node(wq, node)->pool->id);
delim = " ";
}
written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
rcu_read_unlock_sched();
return written;
}
static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
int written;
mutex_lock(&wq->mutex);
written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
mutex_unlock(&wq->mutex);
return written;
}
/* prepare workqueue_attrs for sysfs store operations */
static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
{
struct workqueue_attrs *attrs;
attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!attrs)
return NULL;
mutex_lock(&wq->mutex);
copy_workqueue_attrs(attrs, wq->unbound_attrs);
mutex_unlock(&wq->mutex);
return attrs;
}
static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
const char *buf, size_t count)
{
struct workqueue_struct *wq = dev_to_wq(dev);
struct workqueue_attrs *attrs;
int ret;
attrs = wq_sysfs_prep_attrs(wq);
if (!attrs)
return -ENOMEM;
if (sscanf(buf, "%d", &attrs->nice) == 1 &&
attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
ret = apply_workqueue_attrs(wq, attrs);
else
ret = -EINVAL;
free_workqueue_attrs(attrs);
return ret ?: count;
}
static ssize_t wq_cpumask_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
int written;
mutex_lock(&wq->mutex);
written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
cpumask_pr_args(wq->unbound_attrs->cpumask));
mutex_unlock(&wq->mutex);
return written;
}
static ssize_t wq_cpumask_store(struct device *dev,
struct device_attribute *attr,
const char *buf, size_t count)
{
struct workqueue_struct *wq = dev_to_wq(dev);
struct workqueue_attrs *attrs;
int ret;
attrs = wq_sysfs_prep_attrs(wq);
if (!attrs)
return -ENOMEM;
ret = cpumask_parse(buf, attrs->cpumask);
if (!ret)
ret = apply_workqueue_attrs(wq, attrs);
free_workqueue_attrs(attrs);
return ret ?: count;
}
static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr,
char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
int written;
/* update NUMA affinity of unbound workqueues */ mutex_lock(&wq->mutex);
list_for_each_entry(wq, &workqueues, list) written = scnprintf(buf, PAGE_SIZE, "%d\n",
wq_update_unbound_numa(wq, cpu, true); !wq->unbound_attrs->no_numa);
mutex_unlock(&wq->mutex);
mutex_unlock(&wq_pool_mutex); return written;
break;
}
return NOTIFY_OK;
} }
/* static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
* Workqueues should be brought down after normal priority CPU notifiers. const char *buf, size_t count)
* This will be registered as low priority CPU notifier.
*/
static int workqueue_cpu_down_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{ {
int cpu = (unsigned long)hcpu; struct workqueue_struct *wq = dev_to_wq(dev);
struct work_struct unbind_work; struct workqueue_attrs *attrs;
struct workqueue_struct *wq; int v, ret;
switch (action & ~CPU_TASKS_FROZEN) {
case CPU_DOWN_PREPARE:
/* unbinding per-cpu workers should happen on the local CPU */
INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
queue_work_on(cpu, system_highpri_wq, &unbind_work);
/* update NUMA affinity of unbound workqueues */ attrs = wq_sysfs_prep_attrs(wq);
mutex_lock(&wq_pool_mutex); if (!attrs)
list_for_each_entry(wq, &workqueues, list) return -ENOMEM;
wq_update_unbound_numa(wq, cpu, false);
mutex_unlock(&wq_pool_mutex);
/* wait for per-cpu unbinding to finish */ ret = -EINVAL;
flush_work(&unbind_work); if (sscanf(buf, "%d", &v) == 1) {
destroy_work_on_stack(&unbind_work); attrs->no_numa = !v;
break; ret = apply_workqueue_attrs(wq, attrs);
} }
return NOTIFY_OK;
free_workqueue_attrs(attrs);
return ret ?: count;
} }
#ifdef CONFIG_SMP static struct device_attribute wq_sysfs_unbound_attrs[] = {
__ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
__ATTR(nice, 0644, wq_nice_show, wq_nice_store),
__ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
__ATTR(numa, 0644, wq_numa_show, wq_numa_store),
__ATTR_NULL,
};
struct work_for_cpu { static struct bus_type wq_subsys = {
struct work_struct work; .name = "workqueue",
long (*fn)(void *); .dev_groups = wq_sysfs_groups,
void *arg;
long ret;
}; };
static void work_for_cpu_fn(struct work_struct *work) static int __init wq_sysfs_init(void)
{ {
struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work); return subsys_virtual_register(&wq_subsys, NULL);
wfc->ret = wfc->fn(wfc->arg);
} }
core_initcall(wq_sysfs_init);
/** static void wq_device_release(struct device *dev)
* work_on_cpu - run a function in user context on a particular cpu
* @cpu: the cpu to run on
* @fn: the function to run
* @arg: the function arg
*
* It is up to the caller to ensure that the cpu doesn't go offline.
* The caller must not hold any locks which would prevent @fn from completing.
*
* Return: The value @fn returns.
*/
long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
{ {
struct work_for_cpu wfc = { .fn = fn, .arg = arg }; struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn); kfree(wq_dev);
schedule_work_on(cpu, &wfc.work);
flush_work(&wfc.work);
destroy_work_on_stack(&wfc.work);
return wfc.ret;
} }
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */
#ifdef CONFIG_FREEZER
/** /**
* freeze_workqueues_begin - begin freezing workqueues * workqueue_sysfs_register - make a workqueue visible in sysfs
* @wq: the workqueue to register
* *
* Start freezing workqueues. After this function returns, all freezable * Expose @wq in sysfs under /sys/bus/workqueue/devices.
* workqueues will queue new works to their delayed_works list instead of * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
* pool->worklist. * which is the preferred method.
* *
* CONTEXT: * Workqueue user should use this function directly iff it wants to apply
* Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's. * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
* apply_workqueue_attrs() may race against userland updating the
* attributes.
*
* Return: 0 on success, -errno on failure.
*/ */
void freeze_workqueues_begin(void) int workqueue_sysfs_register(struct workqueue_struct *wq)
{ {
struct workqueue_struct *wq; struct wq_device *wq_dev;
struct pool_workqueue *pwq; int ret;
mutex_lock(&wq_pool_mutex);
WARN_ON_ONCE(workqueue_freezing); /*
workqueue_freezing = true; * Adjusting max_active or creating new pwqs by applyting
* attributes breaks ordering guarantee. Disallow exposing ordered
* workqueues.
*/
if (WARN_ON(wq->flags & __WQ_ORDERED))
return -EINVAL;
list_for_each_entry(wq, &workqueues, list) { wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
mutex_lock(&wq->mutex); if (!wq_dev)
for_each_pwq(pwq, wq) return -ENOMEM;
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
}
mutex_unlock(&wq_pool_mutex); wq_dev->wq = wq;
} wq_dev->dev.bus = &wq_subsys;
wq_dev->dev.init_name = wq->name;
wq_dev->dev.release = wq_device_release;
/** /*
* freeze_workqueues_busy - are freezable workqueues still busy? * unbound_attrs are created separately. Suppress uevent until
* * everything is ready.
* Check whether freezing is complete. This function must be called */
* between freeze_workqueues_begin() and thaw_workqueues(). dev_set_uevent_suppress(&wq_dev->dev, true);
*
* CONTEXT:
* Grabs and releases wq_pool_mutex.
*
* Return:
* %true if some freezable workqueues are still busy. %false if freezing
* is complete.
*/
bool freeze_workqueues_busy(void)
{
bool busy = false;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex); ret = device_register(&wq_dev->dev);
if (ret) {
kfree(wq_dev);
wq->wq_dev = NULL;
return ret;
}
WARN_ON_ONCE(!workqueue_freezing); if (wq->flags & WQ_UNBOUND) {
struct device_attribute *attr;
list_for_each_entry(wq, &workqueues, list) { for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
if (!(wq->flags & WQ_FREEZABLE)) ret = device_create_file(&wq_dev->dev, attr);
continue; if (ret) {
/* device_unregister(&wq_dev->dev);
* nr_active is monotonically decreasing. It's safe wq->wq_dev = NULL;
* to peek without lock. return ret;
*/
rcu_read_lock_sched();
for_each_pwq(pwq, wq) {
WARN_ON_ONCE(pwq->nr_active < 0);
if (pwq->nr_active) {
busy = true;
rcu_read_unlock_sched();
goto out_unlock;
} }
} }
rcu_read_unlock_sched();
} }
out_unlock:
mutex_unlock(&wq_pool_mutex); dev_set_uevent_suppress(&wq_dev->dev, false);
return busy; kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
return 0;
} }
/** /**
* thaw_workqueues - thaw workqueues * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
* * @wq: the workqueue to unregister
* Thaw workqueues. Normal queueing is restored and all collected
* frozen works are transferred to their respective pool worklists.
* *
* CONTEXT: * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
* Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
*/ */
void thaw_workqueues(void) static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
{ {
struct workqueue_struct *wq; struct wq_device *wq_dev = wq->wq_dev;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
if (!workqueue_freezing)
goto out_unlock;
workqueue_freezing = false;
/* restore max_active and repopulate worklist */ if (!wq->wq_dev)
list_for_each_entry(wq, &workqueues, list) { return;
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
}
out_unlock: wq->wq_dev = NULL;
mutex_unlock(&wq_pool_mutex); device_unregister(&wq_dev->dev);
} }
#endif /* CONFIG_FREEZER */ #else /* CONFIG_SYSFS */
static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
#endif /* CONFIG_SYSFS */
static void __init wq_numa_init(void) static void __init wq_numa_init(void)
{ {
......
...@@ -1310,7 +1310,7 @@ bool is_kernel_percpu_address(unsigned long addr) ...@@ -1310,7 +1310,7 @@ bool is_kernel_percpu_address(unsigned long addr)
* and, from the second one, the backing allocator (currently either vm or * and, from the second one, the backing allocator (currently either vm or
* km) provides translation. * km) provides translation.
* *
* The addr can be tranlated simply without checking if it falls into the * The addr can be translated simply without checking if it falls into the
* first chunk. But the current code reflects better how percpu allocator * first chunk. But the current code reflects better how percpu allocator
* actually works, and the verification can discover both bugs in percpu * actually works, and the verification can discover both bugs in percpu
* allocator itself and per_cpu_ptr_to_phys() callers. So we keep current * allocator itself and per_cpu_ptr_to_phys() callers. So we keep current
...@@ -1762,7 +1762,7 @@ early_param("percpu_alloc", percpu_alloc_setup); ...@@ -1762,7 +1762,7 @@ early_param("percpu_alloc", percpu_alloc_setup);
* and other parameters considering needed percpu size, allocation * and other parameters considering needed percpu size, allocation
* atom size and distances between CPUs. * atom size and distances between CPUs.
* *
* Groups are always mutliples of atom size and CPUs which are of * Groups are always multiples of atom size and CPUs which are of
* LOCAL_DISTANCE both ways are grouped together and share space for * LOCAL_DISTANCE both ways are grouped together and share space for
* units in the same group. The returned configuration is guaranteed * units in the same group. The returned configuration is guaranteed
* to have CPUs on different nodes on different groups and >=75% usage * to have CPUs on different nodes on different groups and >=75% usage
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment