Commit c6df3275 authored by Mike Snitzer's avatar Mike Snitzer

Merge remote-tracking branch 'tejun/for-6.9' into dm-6.9-bh-wq

parents fa34e589 1acd92d9
......@@ -7225,6 +7225,15 @@
threshold repeatedly. They are likely good
candidates for using WQ_UNBOUND workqueues instead.
workqueue.cpu_intensive_warning_thresh=<uint>
If CONFIG_WQ_CPU_INTENSIVE_REPORT is set, the kernel
will report the work functions which violate the
intensive_threshold_us repeatedly. In order to prevent
spurious warnings, start printing only after a work
function has violated this threshold number of times.
The default is 4 times. 0 disables the warning.
workqueue.power_efficient
Per-cpu workqueues are generally preferred because
they show better performance thanks to cache
......
......@@ -77,10 +77,12 @@ wants a function to be executed asynchronously it has to set up a work
item pointing to that function and queue that work item on a
workqueue.
Special purpose threads, called worker threads, execute the functions
off of the queue, one after the other. If no work is queued, the
worker threads become idle. These worker threads are managed in so
called worker-pools.
A work item can be executed in either a thread or the BH (softirq) context.
For threaded workqueues, special purpose threads, called [k]workers, execute
the functions off of the queue, one after the other. If no work is queued,
the worker threads become idle. These worker threads are managed in
worker-pools.
The cmwq design differentiates between the user-facing workqueues that
subsystems and drivers queue work items on and the backend mechanism
......@@ -91,6 +93,12 @@ for high priority ones, for each possible CPU and some extra
worker-pools to serve work items queued on unbound workqueues - the
number of these backing pools is dynamic.
BH workqueues use the same framework. However, as there can only be one
concurrent execution context, there's no need to worry about concurrency.
Each per-CPU BH worker pool contains only one pseudo worker which represents
the BH execution context. A BH workqueue can be considered a convenience
interface to softirq.
Subsystems and drivers can create and queue work items through special
workqueue API functions as they see fit. They can influence some
aspects of the way the work items are executed by setting flags on the
......@@ -106,7 +114,7 @@ unless specifically overridden, a work item of a bound workqueue will
be queued on the worklist of either normal or highpri worker-pool that
is associated to the CPU the issuer is running on.
For any worker pool implementation, managing the concurrency level
For any thread pool implementation, managing the concurrency level
(how many execution contexts are active) is an important issue. cmwq
tries to keep the concurrency at a minimal but sufficient level.
Minimal to save resources and sufficient in that the system is used at
......@@ -164,6 +172,17 @@ resources, scheduled and executed.
``flags``
---------
``WQ_BH``
BH workqueues can be considered a convenience interface to softirq. BH
workqueues are always per-CPU and all BH work items are executed in the
queueing CPU's softirq context in the queueing order.
All BH workqueues must have 0 ``max_active`` and ``WQ_HIGHPRI`` is the
only allowed additional flag.
BH work items cannot sleep. All other features such as delayed queueing,
flushing and canceling are supported.
``WQ_UNBOUND``
Work items queued to an unbound wq are served by the special
worker-pools which host workers which are not bound to any
......@@ -237,15 +256,11 @@ may queue at the same time. Unless there is a specific need for
throttling the number of active work items, specifying '0' is
recommended.
Some users depend on the strict execution ordering of ST wq. The
combination of ``@max_active`` of 1 and ``WQ_UNBOUND`` used to
achieve this behavior. Work items on such wq were always queued to the
unbound worker-pools and only one work item could be active at any given
time thus achieving the same ordering property as ST wq.
In the current implementation the above configuration only guarantees
ST behavior within a given NUMA node. Instead ``alloc_ordered_workqueue()`` should
be used to achieve system-wide ST behavior.
Some users depend on strict execution ordering where only one work item
is in flight at any given time and the work items are processed in
queueing order. While the combination of ``@max_active`` of 1 and
``WQ_UNBOUND`` used to achieve this behavior, this is no longer the
case. Use ``alloc_ordered_queue()`` instead.
Example Execution Scenarios
......
......@@ -120,4 +120,5 @@ extern void async_synchronize_cookie(async_cookie_t cookie);
extern void async_synchronize_cookie_domain(async_cookie_t cookie,
struct async_domain *domain);
extern bool current_is_async(void);
extern void async_init(void);
#endif
......@@ -22,20 +22,54 @@
*/
#define work_data_bits(work) ((unsigned long *)(&(work)->data))
enum {
enum work_bits {
WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */
WORK_STRUCT_INACTIVE_BIT= 1, /* work item is inactive */
WORK_STRUCT_PWQ_BIT = 2, /* data points to pwq */
WORK_STRUCT_LINKED_BIT = 3, /* next work is linked to this one */
WORK_STRUCT_INACTIVE_BIT, /* work item is inactive */
WORK_STRUCT_PWQ_BIT, /* data points to pwq */
WORK_STRUCT_LINKED_BIT, /* next work is linked to this one */
#ifdef CONFIG_DEBUG_OBJECTS_WORK
WORK_STRUCT_STATIC_BIT = 4, /* static initializer (debugobjects) */
WORK_STRUCT_COLOR_SHIFT = 5, /* color for workqueue flushing */
#else
WORK_STRUCT_COLOR_SHIFT = 4, /* color for workqueue flushing */
WORK_STRUCT_STATIC_BIT, /* static initializer (debugobjects) */
#endif
WORK_STRUCT_FLAG_BITS,
/* color for workqueue flushing */
WORK_STRUCT_COLOR_SHIFT = WORK_STRUCT_FLAG_BITS,
WORK_STRUCT_COLOR_BITS = 4,
/*
* When WORK_STRUCT_PWQ is set, reserve 8 bits off of pwq pointer w/
* debugobjects turned off. This makes pwqs aligned to 256 bytes (512
* bytes w/ DEBUG_OBJECTS_WORK) and allows 16 workqueue flush colors.
*
* MSB
* [ pwq pointer ] [ flush color ] [ STRUCT flags ]
* 4 bits 4 or 5 bits
*/
WORK_STRUCT_PWQ_SHIFT = WORK_STRUCT_COLOR_SHIFT + WORK_STRUCT_COLOR_BITS,
/*
* data contains off-queue information when !WORK_STRUCT_PWQ.
*
* MSB
* [ pool ID ] [ OFFQ flags ] [ STRUCT flags ]
* 1 bit 4 or 5 bits
*/
WORK_OFFQ_FLAG_SHIFT = WORK_STRUCT_FLAG_BITS,
WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT,
WORK_OFFQ_FLAG_END,
WORK_OFFQ_FLAG_BITS = WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
/*
* When a work item is off queue, the high bits encode off-queue flags
* and the last pool it was on. Cap pool ID to 31 bits and use the
* highest number to indicate that no pool is associated.
*/
WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
WORK_OFFQ_LEFT = BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT,
WORK_OFFQ_POOL_BITS = WORK_OFFQ_LEFT <= 31 ? WORK_OFFQ_LEFT : 31,
};
enum work_flags {
WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
WORK_STRUCT_INACTIVE = 1 << WORK_STRUCT_INACTIVE_BIT,
WORK_STRUCT_PWQ = 1 << WORK_STRUCT_PWQ_BIT,
......@@ -45,35 +79,14 @@ enum {
#else
WORK_STRUCT_STATIC = 0,
#endif
};
enum wq_misc_consts {
WORK_NR_COLORS = (1 << WORK_STRUCT_COLOR_BITS),
/* not bound to any CPU, prefer the local CPU */
WORK_CPU_UNBOUND = NR_CPUS,
/*
* Reserve 8 bits off of pwq pointer w/ debugobjects turned off.
* This makes pwqs aligned to 256 bytes and allows 16 workqueue
* flush colors.
*/
WORK_STRUCT_FLAG_BITS = WORK_STRUCT_COLOR_SHIFT +
WORK_STRUCT_COLOR_BITS,
/* data contains off-queue information when !WORK_STRUCT_PWQ */
WORK_OFFQ_FLAG_BASE = WORK_STRUCT_COLOR_SHIFT,
__WORK_OFFQ_CANCELING = WORK_OFFQ_FLAG_BASE,
/*
* When a work item is off queue, its high bits point to the last
* pool it was on. Cap at 31 bits and use the highest number to
* indicate that no pool is associated.
*/
WORK_OFFQ_FLAG_BITS = 1,
WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_FLAG_BASE + WORK_OFFQ_FLAG_BITS,
WORK_OFFQ_LEFT = BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT,
WORK_OFFQ_POOL_BITS = WORK_OFFQ_LEFT <= 31 ? WORK_OFFQ_LEFT : 31,
/* bit mask for work_busy() return values */
WORK_BUSY_PENDING = 1 << 0,
WORK_BUSY_RUNNING = 1 << 1,
......@@ -83,12 +96,10 @@ enum {
};
/* Convenience constants - of type 'unsigned long', not 'enum'! */
#define WORK_OFFQ_CANCELING (1ul << __WORK_OFFQ_CANCELING)
#define WORK_OFFQ_CANCELING (1ul << WORK_OFFQ_CANCELING_BIT)
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
#define WORK_STRUCT_NO_POOL (WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT)
#define WORK_STRUCT_FLAG_MASK ((1ul << WORK_STRUCT_FLAG_BITS) - 1)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
#define WORK_STRUCT_PWQ_MASK (~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1))
#define WORK_DATA_INIT() ATOMIC_LONG_INIT((unsigned long)WORK_STRUCT_NO_POOL)
#define WORK_DATA_STATIC_INIT() \
......@@ -347,7 +358,8 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
* Workqueue flags and constants. For details, please refer to
* Documentation/core-api/workqueue.rst.
*/
enum {
enum wq_flags {
WQ_BH = 1 << 0, /* execute in bottom half (softirq) context */
WQ_UNBOUND = 1 << 1, /* not bound to any cpu */
WQ_FREEZABLE = 1 << 2, /* freeze during suspend */
WQ_MEM_RECLAIM = 1 << 3, /* may be used for memory reclaim */
......@@ -386,11 +398,22 @@ enum {
__WQ_DRAINING = 1 << 16, /* internal: workqueue is draining */
__WQ_ORDERED = 1 << 17, /* internal: workqueue is ordered */
__WQ_LEGACY = 1 << 18, /* internal: create*_workqueue() */
__WQ_ORDERED_EXPLICIT = 1 << 19, /* internal: alloc_ordered_workqueue() */
/* BH wq only allows the following flags */
__WQ_BH_ALLOWS = WQ_BH | WQ_HIGHPRI,
};
enum wq_consts {
WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */
WQ_UNBOUND_MAX_ACTIVE = WQ_MAX_ACTIVE,
WQ_DFL_ACTIVE = WQ_MAX_ACTIVE / 2,
/*
* Per-node default cap on min_active. Unless explicitly set, min_active
* is set to min(max_active, WQ_DFL_MIN_ACTIVE). For more details, see
* workqueue_struct->min_active definition.
*/
WQ_DFL_MIN_ACTIVE = 8,
};
/*
......@@ -420,6 +443,9 @@ enum {
* they are same as their non-power-efficient counterparts - e.g.
* system_power_efficient_wq is identical to system_wq if
* 'wq_power_efficient' is disabled. See WQ_POWER_EFFICIENT for more info.
*
* system_bh[_highpri]_wq are convenience interface to softirq. BH work items
* are executed in the queueing CPU's BH context in the queueing order.
*/
extern struct workqueue_struct *system_wq;
extern struct workqueue_struct *system_highpri_wq;
......@@ -428,16 +454,43 @@ extern struct workqueue_struct *system_unbound_wq;
extern struct workqueue_struct *system_freezable_wq;
extern struct workqueue_struct *system_power_efficient_wq;
extern struct workqueue_struct *system_freezable_power_efficient_wq;
extern struct workqueue_struct *system_bh_wq;
extern struct workqueue_struct *system_bh_highpri_wq;
void workqueue_softirq_action(bool highpri);
void workqueue_softirq_dead(unsigned int cpu);
/**
* alloc_workqueue - allocate a workqueue
* @fmt: printf format for the name of the workqueue
* @flags: WQ_* flags
* @max_active: max in-flight work items per CPU, 0 for default
* @max_active: max in-flight work items, 0 for default
* remaining args: args for @fmt
*
* Allocate a workqueue with the specified parameters. For detailed
* information on WQ_* flags, please refer to
* For a per-cpu workqueue, @max_active limits the number of in-flight work
* items for each CPU. e.g. @max_active of 1 indicates that each CPU can be
* executing at most one work item for the workqueue.
*
* For unbound workqueues, @max_active limits the number of in-flight work items
* for the whole system. e.g. @max_active of 16 indicates that that there can be
* at most 16 work items executing for the workqueue in the whole system.
*
* As sharing the same active counter for an unbound workqueue across multiple
* NUMA nodes can be expensive, @max_active is distributed to each NUMA node
* according to the proportion of the number of online CPUs and enforced
* independently.
*
* Depending on online CPU distribution, a node may end up with per-node
* max_active which is significantly lower than @max_active, which can lead to
* deadlocks if the per-node concurrency limit is lower than the maximum number
* of interdependent work items for the workqueue.
*
* To guarantee forward progress regardless of online CPU distribution, the
* concurrency limit on every node is guaranteed to be equal to or greater than
* min_active which is set to min(@max_active, %WQ_DFL_MIN_ACTIVE). This means
* that the sum of per-node max_active's may be larger than @max_active.
*
* For detailed information on %WQ_* flags, please refer to
* Documentation/core-api/workqueue.rst.
*
* RETURNS:
......@@ -460,8 +513,7 @@ alloc_workqueue(const char *fmt, unsigned int flags, int max_active, ...);
* Pointer to the allocated workqueue on success, %NULL on failure.
*/
#define alloc_ordered_workqueue(fmt, flags, args...) \
alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | \
__WQ_ORDERED_EXPLICIT | (flags), 1, ##args)
alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)
#define create_workqueue(name) \
alloc_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, 1, (name))
......@@ -471,6 +523,9 @@ alloc_workqueue(const char *fmt, unsigned int flags, int max_active, ...);
#define create_singlethread_workqueue(name) \
alloc_ordered_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, name)
#define from_work(var, callback_work, work_fieldname) \
container_of(callback_work, typeof(*var), work_fieldname)
extern void destroy_workqueue(struct workqueue_struct *wq);
struct workqueue_attrs *alloc_workqueue_attrs(void);
......@@ -508,6 +563,8 @@ extern bool flush_rcu_work(struct rcu_work *rwork);
extern void workqueue_set_max_active(struct workqueue_struct *wq,
int max_active);
extern void workqueue_set_min_active(struct workqueue_struct *wq,
int min_active);
extern struct work_struct *current_work(void);
extern bool current_is_workqueue_rescuer(void);
extern bool workqueue_congested(int cpu, struct workqueue_struct *wq);
......
......@@ -106,7 +106,7 @@ config CONSTRUCTORS
bool
config IRQ_WORK
bool
def_bool y if SMP
config BUILDTIME_TABLE_SORT
bool
......
......@@ -1545,6 +1545,7 @@ static noinline void __init kernel_init_freeable(void)
sched_init_smp();
workqueue_init_topology();
async_init();
padata_init();
page_alloc_init_late();
......
......@@ -64,6 +64,7 @@ static async_cookie_t next_cookie = 1;
static LIST_HEAD(async_global_pending); /* pending from all registered doms */
static ASYNC_DOMAIN(async_dfl_domain);
static DEFINE_SPINLOCK(async_lock);
static struct workqueue_struct *async_wq;
struct async_entry {
struct list_head domain_list;
......@@ -174,7 +175,7 @@ static async_cookie_t __async_schedule_node_domain(async_func_t func,
spin_unlock_irqrestore(&async_lock, flags);
/* schedule for execution */
queue_work_node(node, system_unbound_wq, &entry->work);
queue_work_node(node, async_wq, &entry->work);
return newcookie;
}
......@@ -345,3 +346,17 @@ bool current_is_async(void)
return worker && worker->current_func == async_run_entry_fn;
}
EXPORT_SYMBOL_GPL(current_is_async);
void __init async_init(void)
{
/*
* Async can schedule a number of interdependent work items. However,
* unbound workqueues can handle only upto min_active interdependent
* work items. The default min_active of 8 isn't sufficient for async
* and can lead to stalls. Let's use a dedicated workqueue with raised
* min_active.
*/
async_wq = alloc_workqueue("async", WQ_UNBOUND, 0);
BUG_ON(!async_wq);
workqueue_set_min_active(async_wq, WQ_DFL_ACTIVE);
}
......@@ -27,6 +27,7 @@
#include <linux/tick.h>
#include <linux/irq.h>
#include <linux/wait_bit.h>
#include <linux/workqueue.h>
#include <asm/softirq_stack.h>
......@@ -802,11 +803,13 @@ static void tasklet_action_common(struct softirq_action *a,
static __latent_entropy void tasklet_action(struct softirq_action *a)
{
workqueue_softirq_action(false);
tasklet_action_common(a, this_cpu_ptr(&tasklet_vec), TASKLET_SOFTIRQ);
}
static __latent_entropy void tasklet_hi_action(struct softirq_action *a)
{
workqueue_softirq_action(true);
tasklet_action_common(a, this_cpu_ptr(&tasklet_hi_vec), HI_SOFTIRQ);
}
......@@ -929,6 +932,8 @@ static void run_ksoftirqd(unsigned int cpu)
#ifdef CONFIG_HOTPLUG_CPU
static int takeover_tasklets(unsigned int cpu)
{
workqueue_softirq_dead(cpu);
/* CPU is dead, so no lock needed. */
local_irq_disable();
......
......@@ -29,6 +29,7 @@
#include <linux/kernel.h>
#include <linux/sched.h>
#include <linux/init.h>
#include <linux/interrupt.h>
#include <linux/signal.h>
#include <linux/completion.h>
#include <linux/workqueue.h>
......@@ -53,10 +54,11 @@
#include <linux/nmi.h>
#include <linux/kvm_para.h>
#include <linux/delay.h>
#include <linux/irq_work.h>
#include "workqueue_internal.h"
enum {
enum worker_pool_flags {
/*
* worker_pool flags
*
......@@ -72,10 +74,17 @@ enum {
* Note that DISASSOCIATED should be flipped only while holding
* wq_pool_attach_mutex to avoid changing binding state while
* worker_attach_to_pool() is in progress.
*
* As there can only be one concurrent BH execution context per CPU, a
* BH pool is per-CPU and always DISASSOCIATED.
*/
POOL_MANAGER_ACTIVE = 1 << 0, /* being managed */
POOL_BH = 1 << 0, /* is a BH pool */
POOL_MANAGER_ACTIVE = 1 << 1, /* being managed */
POOL_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
POOL_BH_DRAINING = 1 << 3, /* draining after CPU offline */
};
enum worker_flags {
/* worker flags */
WORKER_DIE = 1 << 1, /* die die die */
WORKER_IDLE = 1 << 2, /* is idle */
......@@ -86,7 +95,13 @@ enum {
WORKER_NOT_RUNNING = WORKER_PREP | WORKER_CPU_INTENSIVE |
WORKER_UNBOUND | WORKER_REBOUND,
};
enum work_cancel_flags {
WORK_CANCEL_DELAYED = 1 << 0, /* canceling a delayed_work */
};
enum wq_internal_consts {
NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */
UNBOUND_POOL_HASH_ORDER = 6, /* hashed by pool->attrs */
......@@ -108,9 +123,17 @@ enum {
RESCUER_NICE_LEVEL = MIN_NICE,
HIGHPRI_NICE_LEVEL = MIN_NICE,
WQ_NAME_LEN = 24,
WQ_NAME_LEN = 32,
};
/*
* We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
* MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
* msecs_to_jiffies() can't be an initializer.
*/
#define BH_WORKER_JIFFIES msecs_to_jiffies(2)
#define BH_WORKER_RESTARTS 10
/*
* Structure fields follow one of the following exclusion rules.
*
......@@ -122,6 +145,9 @@ enum {
*
* L: pool->lock protected. Access with pool->lock held.
*
* LN: pool->lock and wq_node_nr_active->lock protected for writes. Either for
* reads.
*
* K: Only modified by worker while holding pool->lock. Can be safely read by
* self, while holding pool->lock or from IRQ context if %current is the
* kworker.
......@@ -143,6 +169,9 @@ enum {
*
* WR: wq->mutex protected for writes. RCU protected for reads.
*
* WO: wq->mutex protected for writes. Updated with WRITE_ONCE() and can be read
* with READ_ONCE() without locking.
*
* MD: wq_mayday_lock protected.
*
* WD: Used internally by the watchdog.
......@@ -219,7 +248,7 @@ enum pool_workqueue_stats {
};
/*
* The per-pool workqueue. While queued, the lower WORK_STRUCT_FLAG_BITS
* The per-pool workqueue. While queued, bits below WORK_PWQ_SHIFT
* of work_struct->data are used for flags and the remaining high bits
* point to the pwq; thus, pwqs need to be aligned at two's power of the
* number of flag bits.
......@@ -232,6 +261,7 @@ struct pool_workqueue {
int refcnt; /* L: reference count */
int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
bool plugged; /* L: execution suspended */
/*
* nr_active management and WORK_STRUCT_INACTIVE:
......@@ -240,18 +270,18 @@ struct pool_workqueue {
* pwq->inactive_works instead of pool->worklist and marked with
* WORK_STRUCT_INACTIVE.
*
* All work items marked with WORK_STRUCT_INACTIVE do not participate
* in pwq->nr_active and all work items in pwq->inactive_works are
* marked with WORK_STRUCT_INACTIVE. But not all WORK_STRUCT_INACTIVE
* work items are in pwq->inactive_works. Some of them are ready to
* run in pool->worklist or worker->scheduled. Those work itmes are
* only struct wq_barrier which is used for flush_work() and should
* not participate in pwq->nr_active. For non-barrier work item, it
* is marked with WORK_STRUCT_INACTIVE iff it is in pwq->inactive_works.
* All work items marked with WORK_STRUCT_INACTIVE do not participate in
* nr_active and all work items in pwq->inactive_works are marked with
* WORK_STRUCT_INACTIVE. But not all WORK_STRUCT_INACTIVE work items are
* in pwq->inactive_works. Some of them are ready to run in
* pool->worklist or worker->scheduled. Those work itmes are only struct
* wq_barrier which is used for flush_work() and should not participate
* in nr_active. For non-barrier work item, it is marked with
* WORK_STRUCT_INACTIVE iff it is in pwq->inactive_works.
*/
int nr_active; /* L: nr of active works */
int max_active; /* L: max active works */
struct list_head inactive_works; /* L: inactive works */
struct list_head pending_node; /* LN: node on wq_node_nr_active->pending_pwqs */
struct list_head pwqs_node; /* WR: node on wq->pwqs */
struct list_head mayday_node; /* MD: node on wq->maydays */
......@@ -265,7 +295,7 @@ struct pool_workqueue {
*/
struct kthread_work release_work;
struct rcu_head rcu;
} __aligned(1 << WORK_STRUCT_FLAG_BITS);
} __aligned(1 << WORK_STRUCT_PWQ_SHIFT);
/*
* Structure used to wait for workqueue flush.
......@@ -278,6 +308,26 @@ struct wq_flusher {
struct wq_device;
/*
* Unlike in a per-cpu workqueue where max_active limits its concurrency level
* on each CPU, in an unbound workqueue, max_active applies to the whole system.
* As sharing a single nr_active across multiple sockets can be very expensive,
* the counting and enforcement is per NUMA node.
*
* The following struct is used to enforce per-node max_active. When a pwq wants
* to start executing a work item, it should increment ->nr using
* tryinc_node_nr_active(). If acquisition fails due to ->nr already being over
* ->max, the pwq is queued on ->pending_pwqs. As in-flight work items finish
* and decrement ->nr, node_activate_pending_pwq() activates the pending pwqs in
* round-robin order.
*/
struct wq_node_nr_active {
int max; /* per-node max_active */
atomic_t nr; /* per-node nr_active */
raw_spinlock_t lock; /* nests inside pool locks */
struct list_head pending_pwqs; /* LN: pwqs with inactive works */
};
/*
* The externally visible workqueue. It relays the issued work items to
* the appropriate worker_pool through its pool_workqueues.
......@@ -298,10 +348,15 @@ struct workqueue_struct {
struct worker *rescuer; /* MD: rescue worker */
int nr_drainers; /* WQ: drain in progress */
int saved_max_active; /* WQ: saved pwq max_active */
/* See alloc_workqueue() function comment for info on min/max_active */
int max_active; /* WO: max active works */
int min_active; /* WO: min active works */
int saved_max_active; /* WQ: saved max_active */
int saved_min_active; /* WQ: saved min_active */
struct workqueue_attrs *unbound_attrs; /* PW: only for unbound wqs */
struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs */
struct pool_workqueue __rcu *dfl_pwq; /* PW: only for unbound wqs */
#ifdef CONFIG_SYSFS
struct wq_device *wq_dev; /* I: for sysfs interface */
......@@ -323,10 +378,9 @@ struct workqueue_struct {
/* hot fields used during command issue, aligned to cacheline */
unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
struct pool_workqueue __percpu __rcu **cpu_pwq; /* I: per-cpu pwqs */
struct wq_node_nr_active *node_nr_active[]; /* I: per-node nr_active */
};
static struct kmem_cache *pwq_cache;
/*
* Each pod type describes how CPUs should be grouped for unbound workqueues.
* See the comment above workqueue_attrs->affn_scope.
......@@ -338,16 +392,13 @@ struct wq_pod_type {
int *cpu_pod; /* cpu -> pod */
};
static struct wq_pod_type wq_pod_types[WQ_AFFN_NR_TYPES];
static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE;
static const char *wq_affn_names[WQ_AFFN_NR_TYPES] = {
[WQ_AFFN_DFL] = "default",
[WQ_AFFN_CPU] = "cpu",
[WQ_AFFN_SMT] = "smt",
[WQ_AFFN_CACHE] = "cache",
[WQ_AFFN_NUMA] = "numa",
[WQ_AFFN_SYSTEM] = "system",
[WQ_AFFN_DFL] = "default",
[WQ_AFFN_CPU] = "cpu",
[WQ_AFFN_SMT] = "smt",
[WQ_AFFN_CACHE] = "cache",
[WQ_AFFN_NUMA] = "numa",
[WQ_AFFN_SYSTEM] = "system",
};
/*
......@@ -359,12 +410,22 @@ static const char *wq_affn_names[WQ_AFFN_NR_TYPES] = {
*/
static unsigned long wq_cpu_intensive_thresh_us = ULONG_MAX;
module_param_named(cpu_intensive_thresh_us, wq_cpu_intensive_thresh_us, ulong, 0644);
#ifdef CONFIG_WQ_CPU_INTENSIVE_REPORT
static unsigned int wq_cpu_intensive_warning_thresh = 4;
module_param_named(cpu_intensive_warning_thresh, wq_cpu_intensive_warning_thresh, uint, 0644);
#endif
/* see the comment above the definition of WQ_POWER_EFFICIENT */
static bool wq_power_efficient = IS_ENABLED(CONFIG_WQ_POWER_EFFICIENT_DEFAULT);
module_param_named(power_efficient, wq_power_efficient, bool, 0444);
static bool wq_online; /* can kworkers be created yet? */
static bool wq_topo_initialized __read_mostly = false;
static struct kmem_cache *pwq_cache;
static struct wq_pod_type wq_pod_types[WQ_AFFN_NR_TYPES];
static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE;
/* buf for wq_update_unbound_pod_attrs(), protected by CPU hotplug exclusion */
static struct workqueue_attrs *wq_update_pod_attrs_buf;
......@@ -405,8 +466,17 @@ static bool wq_debug_force_rr_cpu = false;
#endif
module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
/* to raise softirq for the BH worker pools on other CPUs */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct irq_work [NR_STD_WORKER_POOLS],
bh_pool_irq_works);
/* the BH worker pools */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
bh_worker_pools);
/* the per-cpu worker pools */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
cpu_worker_pools);
static DEFINE_IDR(worker_pool_idr); /* PR: idr of all pools */
......@@ -419,6 +489,12 @@ static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
/* I: attributes used when instantiating ordered pools on demand */
static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];
/*
* Used to synchronize multiple cancel_sync attempts on the same work item. See
* work_grab_pending() and __cancel_work_sync().
*/
static DECLARE_WAIT_QUEUE_HEAD(wq_cancel_waitq);
/*
* I: kthread_worker to release pwq's. pwq release needs to be bounced to a
* process context while holding a pool lock. Bounce to a dedicated kthread
......@@ -440,6 +516,10 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init;
EXPORT_SYMBOL_GPL(system_power_efficient_wq);
struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
struct workqueue_struct *system_bh_wq;
EXPORT_SYMBOL_GPL(system_bh_wq);
struct workqueue_struct *system_bh_highpri_wq;
EXPORT_SYMBOL_GPL(system_bh_highpri_wq);
static int worker_thread(void *__worker);
static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
......@@ -450,16 +530,21 @@ static void show_one_worker_pool(struct worker_pool *pool);
#include <trace/events/workqueue.h>
#define assert_rcu_or_pool_mutex() \
RCU_LOCKDEP_WARN(!rcu_read_lock_held() && \
RCU_LOCKDEP_WARN(!rcu_read_lock_any_held() && \
!lockdep_is_held(&wq_pool_mutex), \
"RCU or wq_pool_mutex should be held")
#define assert_rcu_or_wq_mutex_or_pool_mutex(wq) \
RCU_LOCKDEP_WARN(!rcu_read_lock_held() && \
RCU_LOCKDEP_WARN(!rcu_read_lock_any_held() && \
!lockdep_is_held(&wq->mutex) && \
!lockdep_is_held(&wq_pool_mutex), \
"RCU, wq->mutex or wq_pool_mutex should be held")
#define for_each_bh_worker_pool(pool, cpu) \
for ((pool) = &per_cpu(bh_worker_pools, cpu)[0]; \
(pool) < &per_cpu(bh_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
(pool)++)
#define for_each_cpu_worker_pool(pool, cpu) \
for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \
(pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
......@@ -632,6 +717,36 @@ static int worker_pool_assign_id(struct worker_pool *pool)
return ret;
}
static struct pool_workqueue __rcu **
unbound_pwq_slot(struct workqueue_struct *wq, int cpu)
{
if (cpu >= 0)
return per_cpu_ptr(wq->cpu_pwq, cpu);
else
return &wq->dfl_pwq;
}
/* @cpu < 0 for dfl_pwq */
static struct pool_workqueue *unbound_pwq(struct workqueue_struct *wq, int cpu)
{
return rcu_dereference_check(*unbound_pwq_slot(wq, cpu),
lockdep_is_held(&wq_pool_mutex) ||
lockdep_is_held(&wq->mutex));
}
/**
* unbound_effective_cpumask - effective cpumask of an unbound workqueue
* @wq: workqueue of interest
*
* @wq->unbound_attrs->cpumask contains the cpumask requested by the user which
* is masked with wq_unbound_cpumask to determine the effective cpumask. The
* default pwq is always mapped to the pool with the current effective cpumask.
*/
static struct cpumask *unbound_effective_cpumask(struct workqueue_struct *wq)
{
return unbound_pwq(wq, -1)->pool->attrs->__pod_cpumask;
}
static unsigned int work_color_to_flags(int color)
{
return color << WORK_STRUCT_COLOR_SHIFT;
......@@ -653,10 +768,9 @@ static int work_next_color(int color)
* contain the pointer to the queued pwq. Once execution starts, the flag
* is cleared and the high bits contain OFFQ flags and pool ID.
*
* set_work_pwq(), set_work_pool_and_clear_pending(), mark_work_canceling()
* and clear_work_data() can be used to set the pwq, pool or clear
* work->data. These functions should only be called while the work is
* owned - ie. while the PENDING bit is set.
* set_work_pwq(), set_work_pool_and_clear_pending() and mark_work_canceling()
* can be used to set the pwq, pool or clear work->data. These functions should
* only be called while the work is owned - ie. while the PENDING bit is set.
*
* get_work_pool() and get_work_pwq() can be used to obtain the pool or pwq
* corresponding to a work. Pool is available once the work has been
......@@ -668,29 +782,28 @@ static int work_next_color(int color)
* but stay off timer and worklist for arbitrarily long and nobody should
* try to steal the PENDING bit.
*/
static inline void set_work_data(struct work_struct *work, unsigned long data,
unsigned long flags)
static inline void set_work_data(struct work_struct *work, unsigned long data)
{
WARN_ON_ONCE(!work_pending(work));
atomic_long_set(&work->data, data | flags | work_static(work));
atomic_long_set(&work->data, data | work_static(work));
}
static void set_work_pwq(struct work_struct *work, struct pool_workqueue *pwq,
unsigned long extra_flags)
unsigned long flags)
{
set_work_data(work, (unsigned long)pwq,
WORK_STRUCT_PENDING | WORK_STRUCT_PWQ | extra_flags);
set_work_data(work, (unsigned long)pwq | WORK_STRUCT_PENDING |
WORK_STRUCT_PWQ | flags);
}
static void set_work_pool_and_keep_pending(struct work_struct *work,
int pool_id)
int pool_id, unsigned long flags)
{
set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT,
WORK_STRUCT_PENDING);
set_work_data(work, ((unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT) |
WORK_STRUCT_PENDING | flags);
}
static void set_work_pool_and_clear_pending(struct work_struct *work,
int pool_id)
int pool_id, unsigned long flags)
{
/*
* The following wmb is paired with the implied mb in
......@@ -699,7 +812,8 @@ static void set_work_pool_and_clear_pending(struct work_struct *work,
* owner.
*/
smp_wmb();
set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT, 0);
set_work_data(work, ((unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT) |
flags);
/*
* The following mb guarantees that previous clear of a PENDING bit
* will not be reordered with any speculative LOADS or STORES from
......@@ -731,15 +845,9 @@ static void set_work_pool_and_clear_pending(struct work_struct *work,
smp_mb();
}
static void clear_work_data(struct work_struct *work)
{
smp_wmb(); /* see set_work_pool_and_clear_pending() */
set_work_data(work, WORK_STRUCT_NO_POOL, 0);
}
static inline struct pool_workqueue *work_struct_pwq(unsigned long data)
{
return (struct pool_workqueue *)(data & WORK_STRUCT_WQ_DATA_MASK);
return (struct pool_workqueue *)(data & WORK_STRUCT_PWQ_MASK);
}
static struct pool_workqueue *get_work_pwq(struct work_struct *work)
......@@ -806,7 +914,7 @@ static void mark_work_canceling(struct work_struct *work)
unsigned long pool_id = get_work_pool_id(work);
pool_id <<= WORK_OFFQ_POOL_SHIFT;
set_work_data(work, pool_id | WORK_OFFQ_CANCELING, WORK_STRUCT_PENDING);
set_work_data(work, pool_id | WORK_STRUCT_PENDING | WORK_OFFQ_CANCELING);
}
static bool work_is_canceling(struct work_struct *work)
......@@ -1101,6 +1209,29 @@ static bool assign_work(struct work_struct *work, struct worker *worker,
return true;
}
static struct irq_work *bh_pool_irq_work(struct worker_pool *pool)
{
int high = pool->attrs->nice == HIGHPRI_NICE_LEVEL ? 1 : 0;
return &per_cpu(bh_pool_irq_works, pool->cpu)[high];
}
static void kick_bh_pool(struct worker_pool *pool)
{
#ifdef CONFIG_SMP
/* see drain_dead_softirq_workfn() for BH_DRAINING */
if (unlikely(pool->cpu != smp_processor_id() &&
!(pool->flags & POOL_BH_DRAINING))) {
irq_work_queue_on(bh_pool_irq_work(pool), pool->cpu);
return;
}
#endif
if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
raise_softirq_irqoff(HI_SOFTIRQ);
else
raise_softirq_irqoff(TASKLET_SOFTIRQ);
}
/**
* kick_pool - wake up an idle worker if necessary
* @pool: pool to kick
......@@ -1118,6 +1249,11 @@ static bool kick_pool(struct worker_pool *pool)
if (!need_more_worker(pool) || !worker)
return false;
if (pool->flags & POOL_BH) {
kick_bh_pool(pool);
return true;
}
p = worker->task;
#ifdef CONFIG_SMP
......@@ -1198,11 +1334,13 @@ static void wq_cpu_intensive_report(work_func_t func)
u64 cnt;
/*
* Start reporting from the fourth time and back off
* Start reporting from the warning_thresh and back off
* exponentially.
*/
cnt = atomic64_inc_return_relaxed(&ent->cnt);
if (cnt >= 4 && is_power_of_2(cnt))
if (wq_cpu_intensive_warning_thresh &&
cnt >= wq_cpu_intensive_warning_thresh &&
is_power_of_2(cnt + 1 - wq_cpu_intensive_warning_thresh))
printk_deferred(KERN_WARNING "workqueue: %ps hogged CPU for >%luus %llu times, consider switching to WQ_UNBOUND\n",
ent->func, wq_cpu_intensive_thresh_us,
atomic64_read(&ent->cnt));
......@@ -1231,10 +1369,12 @@ static void wq_cpu_intensive_report(work_func_t func)
ent = &wci_ents[wci_nr_ents++];
ent->func = func;
atomic64_set(&ent->cnt, 1);
atomic64_set(&ent->cnt, 0);
hash_add_rcu(wci_hash, &ent->hash_node, (unsigned long)func);
raw_spin_unlock(&wci_lock);
goto restart;
}
#else /* CONFIG_WQ_CPU_INTENSIVE_REPORT */
......@@ -1401,6 +1541,74 @@ work_func_t wq_worker_last_func(struct task_struct *task)
return worker->last_func;
}
/**
* wq_node_nr_active - Determine wq_node_nr_active to use
* @wq: workqueue of interest
* @node: NUMA node, can be %NUMA_NO_NODE
*
* Determine wq_node_nr_active to use for @wq on @node. Returns:
*
* - %NULL for per-cpu workqueues as they don't need to use shared nr_active.
*
* - node_nr_active[nr_node_ids] if @node is %NUMA_NO_NODE.
*
* - Otherwise, node_nr_active[@node].
*/
static struct wq_node_nr_active *wq_node_nr_active(struct workqueue_struct *wq,
int node)
{
if (!(wq->flags & WQ_UNBOUND))
return NULL;
if (node == NUMA_NO_NODE)
node = nr_node_ids;
return wq->node_nr_active[node];
}
/**
* wq_update_node_max_active - Update per-node max_actives to use
* @wq: workqueue to update
* @off_cpu: CPU that's going down, -1 if a CPU is not going down
*
* Update @wq->node_nr_active[]->max. @wq must be unbound. max_active is
* distributed among nodes according to the proportions of numbers of online
* cpus. The result is always between @wq->min_active and max_active.
*/
static void wq_update_node_max_active(struct workqueue_struct *wq, int off_cpu)
{
struct cpumask *effective = unbound_effective_cpumask(wq);
int min_active = READ_ONCE(wq->min_active);
int max_active = READ_ONCE(wq->max_active);
int total_cpus, node;
lockdep_assert_held(&wq->mutex);
if (!wq_topo_initialized)
return;
if (off_cpu >= 0 && !cpumask_test_cpu(off_cpu, effective))
off_cpu = -1;
total_cpus = cpumask_weight_and(effective, cpu_online_mask);
if (off_cpu >= 0)
total_cpus--;
for_each_node(node) {
int node_cpus;
node_cpus = cpumask_weight_and(effective, cpumask_of_node(node));
if (off_cpu >= 0 && cpu_to_node(off_cpu) == node)
node_cpus--;
wq_node_nr_active(wq, node)->max =
clamp(DIV_ROUND_UP(max_active * node_cpus, total_cpus),
min_active, max_active);
}
wq_node_nr_active(wq, NUMA_NO_NODE)->max = min_active;
}
/**
* get_pwq - get an extra reference on the specified pool_workqueue
* @pwq: pool_workqueue to get
......@@ -1453,24 +1661,336 @@ static void put_pwq_unlocked(struct pool_workqueue *pwq)
}
}
static void pwq_activate_inactive_work(struct work_struct *work)
static bool pwq_is_empty(struct pool_workqueue *pwq)
{
struct pool_workqueue *pwq = get_work_pwq(work);
return !pwq->nr_active && list_empty(&pwq->inactive_works);
}
static void __pwq_activate_work(struct pool_workqueue *pwq,
struct work_struct *work)
{
unsigned long *wdb = work_data_bits(work);
WARN_ON_ONCE(!(*wdb & WORK_STRUCT_INACTIVE));
trace_workqueue_activate_work(work);
if (list_empty(&pwq->pool->worklist))
pwq->pool->watchdog_ts = jiffies;
move_linked_works(work, &pwq->pool->worklist, NULL);
__clear_bit(WORK_STRUCT_INACTIVE_BIT, work_data_bits(work));
__clear_bit(WORK_STRUCT_INACTIVE_BIT, wdb);
}
/**
* pwq_activate_work - Activate a work item if inactive
* @pwq: pool_workqueue @work belongs to
* @work: work item to activate
*
* Returns %true if activated. %false if already active.
*/
static bool pwq_activate_work(struct pool_workqueue *pwq,
struct work_struct *work)
{
struct worker_pool *pool = pwq->pool;
struct wq_node_nr_active *nna;
lockdep_assert_held(&pool->lock);
if (!(*work_data_bits(work) & WORK_STRUCT_INACTIVE))
return false;
nna = wq_node_nr_active(pwq->wq, pool->node);
if (nna)
atomic_inc(&nna->nr);
pwq->nr_active++;
__pwq_activate_work(pwq, work);
return true;
}
static void pwq_activate_first_inactive(struct pool_workqueue *pwq)
static bool tryinc_node_nr_active(struct wq_node_nr_active *nna)
{
struct work_struct *work = list_first_entry(&pwq->inactive_works,
struct work_struct, entry);
int max = READ_ONCE(nna->max);
while (true) {
int old, tmp;
old = atomic_read(&nna->nr);
if (old >= max)
return false;
tmp = atomic_cmpxchg_relaxed(&nna->nr, old, old + 1);
if (tmp == old)
return true;
}
}
/**
* pwq_tryinc_nr_active - Try to increment nr_active for a pwq
* @pwq: pool_workqueue of interest
* @fill: max_active may have increased, try to increase concurrency level
*
* Try to increment nr_active for @pwq. Returns %true if an nr_active count is
* successfully obtained. %false otherwise.
*/
static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
{
struct workqueue_struct *wq = pwq->wq;
struct worker_pool *pool = pwq->pool;
struct wq_node_nr_active *nna = wq_node_nr_active(wq, pool->node);
bool obtained = false;
lockdep_assert_held(&pool->lock);
if (!nna) {
/* BH or per-cpu workqueue, pwq->nr_active is sufficient */
obtained = pwq->nr_active < READ_ONCE(wq->max_active);
goto out;
}
if (unlikely(pwq->plugged))
return false;
/*
* Unbound workqueue uses per-node shared nr_active $nna. If @pwq is
* already waiting on $nna, pwq_dec_nr_active() will maintain the
* concurrency level. Don't jump the line.
*
* We need to ignore the pending test after max_active has increased as
* pwq_dec_nr_active() can only maintain the concurrency level but not
* increase it. This is indicated by @fill.
*/
if (!list_empty(&pwq->pending_node) && likely(!fill))
goto out;
obtained = tryinc_node_nr_active(nna);
if (obtained)
goto out;
/*
* Lockless acquisition failed. Lock, add ourself to $nna->pending_pwqs
* and try again. The smp_mb() is paired with the implied memory barrier
* of atomic_dec_return() in pwq_dec_nr_active() to ensure that either
* we see the decremented $nna->nr or they see non-empty
* $nna->pending_pwqs.
*/
raw_spin_lock(&nna->lock);
if (list_empty(&pwq->pending_node))
list_add_tail(&pwq->pending_node, &nna->pending_pwqs);
else if (likely(!fill))
goto out_unlock;
smp_mb();
obtained = tryinc_node_nr_active(nna);
/*
* If @fill, @pwq might have already been pending. Being spuriously
* pending in cold paths doesn't affect anything. Let's leave it be.
*/
if (obtained && likely(!fill))
list_del_init(&pwq->pending_node);
out_unlock:
raw_spin_unlock(&nna->lock);
out:
if (obtained)
pwq->nr_active++;
return obtained;
}
/**
* pwq_activate_first_inactive - Activate the first inactive work item on a pwq
* @pwq: pool_workqueue of interest
* @fill: max_active may have increased, try to increase concurrency level
*
* Activate the first inactive work item of @pwq if available and allowed by
* max_active limit.
*
* Returns %true if an inactive work item has been activated. %false if no
* inactive work item is found or max_active limit is reached.
*/
static bool pwq_activate_first_inactive(struct pool_workqueue *pwq, bool fill)
{
struct work_struct *work =
list_first_entry_or_null(&pwq->inactive_works,
struct work_struct, entry);
if (work && pwq_tryinc_nr_active(pwq, fill)) {
__pwq_activate_work(pwq, work);
return true;
} else {
return false;
}
}
/**
* unplug_oldest_pwq - unplug the oldest pool_workqueue
* @wq: workqueue_struct where its oldest pwq is to be unplugged
*
* This function should only be called for ordered workqueues where only the
* oldest pwq is unplugged, the others are plugged to suspend execution to
* ensure proper work item ordering::
*
* dfl_pwq --------------+ [P] - plugged
* |
* v
* pwqs -> A -> B [P] -> C [P] (newest)
* | | |
* 1 3 5
* | | |
* 2 4 6
*
* When the oldest pwq is drained and removed, this function should be called
* to unplug the next oldest one to start its work item execution. Note that
* pwq's are linked into wq->pwqs with the oldest first, so the first one in
* the list is the oldest.
*/
static void unplug_oldest_pwq(struct workqueue_struct *wq)
{
struct pool_workqueue *pwq;
lockdep_assert_held(&wq->mutex);
/* Caller should make sure that pwqs isn't empty before calling */
pwq = list_first_entry_or_null(&wq->pwqs, struct pool_workqueue,
pwqs_node);
raw_spin_lock_irq(&pwq->pool->lock);
if (pwq->plugged) {
pwq->plugged = false;
if (pwq_activate_first_inactive(pwq, true))
kick_pool(pwq->pool);
}
raw_spin_unlock_irq(&pwq->pool->lock);
}
/**
* node_activate_pending_pwq - Activate a pending pwq on a wq_node_nr_active
* @nna: wq_node_nr_active to activate a pending pwq for
* @caller_pool: worker_pool the caller is locking
*
* Activate a pwq in @nna->pending_pwqs. Called with @caller_pool locked.
* @caller_pool may be unlocked and relocked to lock other worker_pools.
*/
static void node_activate_pending_pwq(struct wq_node_nr_active *nna,
struct worker_pool *caller_pool)
{
struct worker_pool *locked_pool = caller_pool;
struct pool_workqueue *pwq;
struct work_struct *work;
lockdep_assert_held(&caller_pool->lock);
raw_spin_lock(&nna->lock);
retry:
pwq = list_first_entry_or_null(&nna->pending_pwqs,
struct pool_workqueue, pending_node);
if (!pwq)
goto out_unlock;
/*
* If @pwq is for a different pool than @locked_pool, we need to lock
* @pwq->pool->lock. Let's trylock first. If unsuccessful, do the unlock
* / lock dance. For that, we also need to release @nna->lock as it's
* nested inside pool locks.
*/
if (pwq->pool != locked_pool) {
raw_spin_unlock(&locked_pool->lock);
locked_pool = pwq->pool;
if (!raw_spin_trylock(&locked_pool->lock)) {
raw_spin_unlock(&nna->lock);
raw_spin_lock(&locked_pool->lock);
raw_spin_lock(&nna->lock);
goto retry;
}
}
/*
* $pwq may not have any inactive work items due to e.g. cancellations.
* Drop it from pending_pwqs and see if there's another one.
*/
work = list_first_entry_or_null(&pwq->inactive_works,
struct work_struct, entry);
if (!work) {
list_del_init(&pwq->pending_node);
goto retry;
}
/*
* Acquire an nr_active count and activate the inactive work item. If
* $pwq still has inactive work items, rotate it to the end of the
* pending_pwqs so that we round-robin through them. This means that
* inactive work items are not activated in queueing order which is fine
* given that there has never been any ordering across different pwqs.
*/
if (likely(tryinc_node_nr_active(nna))) {
pwq->nr_active++;
__pwq_activate_work(pwq, work);
if (list_empty(&pwq->inactive_works))
list_del_init(&pwq->pending_node);
else
list_move_tail(&pwq->pending_node, &nna->pending_pwqs);
/* if activating a foreign pool, make sure it's running */
if (pwq->pool != caller_pool)
kick_pool(pwq->pool);
}
out_unlock:
raw_spin_unlock(&nna->lock);
if (locked_pool != caller_pool) {
raw_spin_unlock(&locked_pool->lock);
raw_spin_lock(&caller_pool->lock);
}
}
/**
* pwq_dec_nr_active - Retire an active count
* @pwq: pool_workqueue of interest
*
* Decrement @pwq's nr_active and try to activate the first inactive work item.
* For unbound workqueues, this function may temporarily drop @pwq->pool->lock.
*/
static void pwq_dec_nr_active(struct pool_workqueue *pwq)
{
struct worker_pool *pool = pwq->pool;
struct wq_node_nr_active *nna = wq_node_nr_active(pwq->wq, pool->node);
lockdep_assert_held(&pool->lock);
/*
* @pwq->nr_active should be decremented for both percpu and unbound
* workqueues.
*/
pwq->nr_active--;
/*
* For a percpu workqueue, it's simple. Just need to kick the first
* inactive work item on @pwq itself.
*/
if (!nna) {
pwq_activate_first_inactive(pwq, false);
return;
}
/*
* If @pwq is for an unbound workqueue, it's more complicated because
* multiple pwqs and pools may be sharing the nr_active count. When a
* pwq needs to wait for an nr_active count, it puts itself on
* $nna->pending_pwqs. The following atomic_dec_return()'s implied
* memory barrier is paired with smp_mb() in pwq_tryinc_nr_active() to
* guarantee that either we see non-empty pending_pwqs or they see
* decremented $nna->nr.
*
* $nna->max may change as CPUs come online/offline and @pwq->wq's
* max_active gets updated. However, it is guaranteed to be equal to or
* larger than @pwq->wq->min_active which is above zero unless freezing.
* This maintains the forward progress guarantee.
*/
if (atomic_dec_return(&nna->nr) >= READ_ONCE(nna->max))
return;
pwq_activate_inactive_work(work);
if (!list_empty(&nna->pending_pwqs))
node_activate_pending_pwq(nna, pool);
}
/**
......@@ -1481,6 +2001,11 @@ static void pwq_activate_first_inactive(struct pool_workqueue *pwq)
* A work either has completed or is removed from pending queue,
* decrement nr_in_flight of its pwq and handle workqueue flushing.
*
* NOTE:
* For unbound workqueues, this function may temporarily drop @pwq->pool->lock
* and thus should be called after all other state updates for the in-flight
* work item is complete.
*
* CONTEXT:
* raw_spin_lock_irq(pool->lock).
*/
......@@ -1488,14 +2013,8 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
{
int color = get_work_color(work_data);
if (!(work_data & WORK_STRUCT_INACTIVE)) {
pwq->nr_active--;
if (!list_empty(&pwq->inactive_works)) {
/* one down, submit an inactive one */
if (pwq->nr_active < pwq->max_active)
pwq_activate_first_inactive(pwq);
}
}
if (!(work_data & WORK_STRUCT_INACTIVE))
pwq_dec_nr_active(pwq);
pwq->nr_in_flight[color]--;
......@@ -1523,8 +2042,8 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
/**
* try_to_grab_pending - steal work item from worklist and disable irq
* @work: work item to steal
* @is_dwork: @work is a delayed_work
* @flags: place to store irq state
* @cflags: %WORK_CANCEL_ flags
* @irq_flags: place to store irq state
*
* Try to grab PENDING bit of @work. This function can handle @work in any
* stable state - idle, on timer or on worklist.
......@@ -1546,20 +2065,20 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
* irqsafe, ensures that we return -EAGAIN for finite short period of time.
*
* On successful return, >= 0, irq is disabled and the caller is
* responsible for releasing it using local_irq_restore(*@flags).
* responsible for releasing it using local_irq_restore(*@irq_flags).
*
* This function is safe to call from any context including IRQ handler.
*/
static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
unsigned long *flags)
static int try_to_grab_pending(struct work_struct *work, u32 cflags,
unsigned long *irq_flags)
{
struct worker_pool *pool;
struct pool_workqueue *pwq;
local_irq_save(*flags);
local_irq_save(*irq_flags);
/* try to steal the timer if it exists */
if (is_dwork) {
if (cflags & WORK_CANCEL_DELAYED) {
struct delayed_work *dwork = to_delayed_work(work);
/*
......@@ -1595,6 +2114,8 @@ static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
*/
pwq = get_work_pwq(work);
if (pwq && pwq->pool == pool) {
unsigned long work_data;
debug_work_deactivate(work);
/*
......@@ -1608,14 +2129,19 @@ static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
* management later on and cause stall. Make sure the work
* item is activated before grabbing.
*/
if (*work_data_bits(work) & WORK_STRUCT_INACTIVE)
pwq_activate_inactive_work(work);
pwq_activate_work(pwq, work);
list_del_init(&work->entry);
pwq_dec_nr_in_flight(pwq, *work_data_bits(work));
/* work->data points to pwq iff queued, point to pool */
set_work_pool_and_keep_pending(work, pool->id);
/*
* work->data points to pwq iff queued. Let's point to pool. As
* this destroys work->data needed by the next step, stash it.
*/
work_data = *work_data_bits(work);
set_work_pool_and_keep_pending(work, pool->id, 0);
/* must be the last step, see the function comment */
pwq_dec_nr_in_flight(pwq, work_data);
raw_spin_unlock(&pool->lock);
rcu_read_unlock();
......@@ -1624,13 +2150,82 @@ static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
raw_spin_unlock(&pool->lock);
fail:
rcu_read_unlock();
local_irq_restore(*flags);
local_irq_restore(*irq_flags);
if (work_is_canceling(work))
return -ENOENT;
cpu_relax();
return -EAGAIN;
}
struct cwt_wait {
wait_queue_entry_t wait;
struct work_struct *work;
};
static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
if (cwait->work != key)
return 0;
return autoremove_wake_function(wait, mode, sync, key);
}
/**
* work_grab_pending - steal work item from worklist and disable irq
* @work: work item to steal
* @cflags: %WORK_CANCEL_ flags
* @irq_flags: place to store IRQ state
*
* Grab PENDING bit of @work. @work can be in any stable state - idle, on timer
* or on worklist.
*
* Must be called in process context. IRQ is disabled on return with IRQ state
* stored in *@irq_flags. The caller is responsible for re-enabling it using
* local_irq_restore().
*
* Returns %true if @work was pending. %false if idle.
*/
static bool work_grab_pending(struct work_struct *work, u32 cflags,
unsigned long *irq_flags)
{
struct cwt_wait cwait;
int ret;
might_sleep();
repeat:
ret = try_to_grab_pending(work, cflags, irq_flags);
if (likely(ret >= 0))
return ret;
if (ret != -ENOENT)
goto repeat;
/*
* Someone is already canceling. Wait for it to finish. flush_work()
* doesn't work for PREEMPT_NONE because we may get woken up between
* @work's completion and the other canceling task resuming and clearing
* CANCELING - flush_work() will return false immediately as @work is no
* longer busy, try_to_grab_pending() will return -ENOENT as @work is
* still being canceled and the other canceling task won't be able to
* clear CANCELING as we're hogging the CPU.
*
* Let's wait for completion using a waitqueue. As this may lead to the
* thundering herd problem, use a custom wake function which matches
* @work along with exclusive wait and wakeup.
*/
init_wait(&cwait.wait);
cwait.wait.func = cwt_wakefn;
cwait.work = work;
prepare_to_wait_exclusive(&wq_cancel_waitq, &cwait.wait,
TASK_UNINTERRUPTIBLE);
if (work_is_canceling(work))
schedule();
finish_wait(&wq_cancel_waitq, &cwait.wait);
goto repeat;
}
/**
* insert_work - insert a work into a pool
* @pwq: pwq @work belongs to
......@@ -1718,7 +2313,6 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
*/
lockdep_assert_irqs_disabled();
/*
* For a draining wq, only works from the same workqueue are
* allowed. The __WQ_DESTROYING helps to spot the issue that
......@@ -1793,12 +2387,16 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
pwq->nr_in_flight[pwq->work_color]++;
work_flags = work_color_to_flags(pwq->work_color);
if (likely(pwq->nr_active < pwq->max_active)) {
/*
* Limit the number of concurrently active work items to max_active.
* @work must also queue behind existing inactive work items to maintain
* ordering when max_active changes. See wq_adjust_max_active().
*/
if (list_empty(&pwq->inactive_works) && pwq_tryinc_nr_active(pwq, false)) {
if (list_empty(&pool->worklist))
pool->watchdog_ts = jiffies;
trace_workqueue_activate_work(work);
pwq->nr_active++;
insert_work(pwq, work, &pool->worklist, work_flags);
kick_pool(pool);
} else {
......@@ -1829,16 +2427,16 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
bool ret = false;
unsigned long flags;
unsigned long irq_flags;
local_irq_save(flags);
local_irq_save(irq_flags);
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_work(cpu, wq, work);
ret = true;
}
local_irq_restore(flags);
local_irq_restore(irq_flags);
return ret;
}
EXPORT_SYMBOL(queue_work_on);
......@@ -1895,7 +2493,7 @@ static int select_numa_node_cpu(int node)
bool queue_work_node(int node, struct workqueue_struct *wq,
struct work_struct *work)
{
unsigned long flags;
unsigned long irq_flags;
bool ret = false;
/*
......@@ -1909,7 +2507,7 @@ bool queue_work_node(int node, struct workqueue_struct *wq,
*/
WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND));
local_irq_save(flags);
local_irq_save(irq_flags);
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
int cpu = select_numa_node_cpu(node);
......@@ -1918,7 +2516,7 @@ bool queue_work_node(int node, struct workqueue_struct *wq,
ret = true;
}
local_irq_restore(flags);
local_irq_restore(irq_flags);
return ret;
}
EXPORT_SYMBOL_GPL(queue_work_node);
......@@ -1958,10 +2556,18 @@ static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
dwork->cpu = cpu;
timer->expires = jiffies + delay;
if (unlikely(cpu != WORK_CPU_UNBOUND))
if (housekeeping_enabled(HK_TYPE_TIMER)) {
/* If the current cpu is a housekeeping cpu, use it. */
cpu = smp_processor_id();
if (!housekeeping_test_cpu(cpu, HK_TYPE_TIMER))
cpu = housekeeping_any_cpu(HK_TYPE_TIMER);
add_timer_on(timer, cpu);
else
add_timer(timer);
} else {
if (likely(cpu == WORK_CPU_UNBOUND))
add_timer(timer);
else
add_timer_on(timer, cpu);
}
}
/**
......@@ -1980,17 +2586,17 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
{
struct work_struct *work = &dwork->work;
bool ret = false;
unsigned long flags;
unsigned long irq_flags;
/* read the comment in __queue_work() */
local_irq_save(flags);
local_irq_save(irq_flags);
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_delayed_work(cpu, wq, dwork, delay);
ret = true;
}
local_irq_restore(flags);
local_irq_restore(irq_flags);
return ret;
}
EXPORT_SYMBOL(queue_delayed_work_on);
......@@ -2016,16 +2622,17 @@ EXPORT_SYMBOL(queue_delayed_work_on);
bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay)
{
unsigned long flags;
unsigned long irq_flags;
int ret;
do {
ret = try_to_grab_pending(&dwork->work, true, &flags);
ret = try_to_grab_pending(&dwork->work, WORK_CANCEL_DELAYED,
&irq_flags);
} while (unlikely(ret == -EAGAIN));
if (likely(ret >= 0)) {
__queue_delayed_work(cpu, wq, dwork, delay);
local_irq_restore(flags);
local_irq_restore(irq_flags);
}
/* -ENOENT from try_to_grab_pending() becomes %true */
......@@ -2100,19 +2707,21 @@ static cpumask_t *pool_allowed_cpus(struct worker_pool *pool)
* cpu-[un]hotplugs.
*/
static void worker_attach_to_pool(struct worker *worker,
struct worker_pool *pool)
struct worker_pool *pool)
{
mutex_lock(&wq_pool_attach_mutex);
/*
* The wq_pool_attach_mutex ensures %POOL_DISASSOCIATED remains
* stable across this function. See the comments above the flag
* definition for details.
* The wq_pool_attach_mutex ensures %POOL_DISASSOCIATED remains stable
* across this function. See the comments above the flag definition for
* details. BH workers are, while per-CPU, always DISASSOCIATED.
*/
if (pool->flags & POOL_DISASSOCIATED)
if (pool->flags & POOL_DISASSOCIATED) {
worker->flags |= WORKER_UNBOUND;
else
} else {
WARN_ON_ONCE(pool->flags & POOL_BH);
kthread_set_per_cpu(worker->task, pool->cpu);
}
if (worker->rescue_wq)
set_cpus_allowed_ptr(worker->task, pool_allowed_cpus(pool));
......@@ -2136,6 +2745,9 @@ static void worker_detach_from_pool(struct worker *worker)
struct worker_pool *pool = worker->pool;
struct completion *detach_completion = NULL;
/* there is one permanent BH worker per CPU which should never detach */
WARN_ON_ONCE(pool->flags & POOL_BH);
mutex_lock(&wq_pool_attach_mutex);
kthread_set_per_cpu(worker->task, -1);
......@@ -2187,27 +2799,29 @@ static struct worker *create_worker(struct worker_pool *pool)
worker->id = id;
if (pool->cpu >= 0)
snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
pool->attrs->nice < 0 ? "H" : "");
else
snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
"kworker/%s", id_buf);
if (IS_ERR(worker->task)) {
if (PTR_ERR(worker->task) == -EINTR) {
pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
id_buf);
} else {
pr_err_once("workqueue: Failed to create a worker thread: %pe",
worker->task);
if (!(pool->flags & POOL_BH)) {
if (pool->cpu >= 0)
snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
pool->attrs->nice < 0 ? "H" : "");
else
snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
worker->task = kthread_create_on_node(worker_thread, worker,
pool->node, "kworker/%s", id_buf);
if (IS_ERR(worker->task)) {
if (PTR_ERR(worker->task) == -EINTR) {
pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
id_buf);
} else {
pr_err_once("workqueue: Failed to create a worker thread: %pe",
worker->task);
}
goto fail;
}
goto fail;
}
set_user_nice(worker->task, pool->attrs->nice);
kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
set_user_nice(worker->task, pool->attrs->nice);
kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
}
/* successful, attach the worker to the pool */
worker_attach_to_pool(worker, pool);
......@@ -2217,14 +2831,14 @@ static struct worker *create_worker(struct worker_pool *pool)
worker->pool->nr_workers++;
worker_enter_idle(worker);
kick_pool(pool);
/*
* @worker is waiting on a completion in kthread() and will trigger hung
* check if not woken up soon. As kick_pool() might not have waken it
* up, wake it up explicitly once more.
* check if not woken up soon. As kick_pool() is noop if @pool is empty,
* wake it up explicitly.
*/
wake_up_process(worker->task);
if (worker->task)
wake_up_process(worker->task);
raw_spin_unlock_irq(&pool->lock);
......@@ -2543,6 +3157,8 @@ __acquires(&pool->lock)
struct pool_workqueue *pwq = get_work_pwq(work);
struct worker_pool *pool = worker->pool;
unsigned long work_data;
int lockdep_start_depth, rcu_start_depth;
bool bh_draining = pool->flags & POOL_BH_DRAINING;
#ifdef CONFIG_LOCKDEP
/*
* It is permissible to free the struct work_struct from
......@@ -2565,7 +3181,8 @@ __acquires(&pool->lock)
worker->current_work = work;
worker->current_func = work->func;
worker->current_pwq = pwq;
worker->current_at = worker->task->se.sum_exec_runtime;
if (worker->task)
worker->current_at = worker->task->se.sum_exec_runtime;
work_data = *work_data_bits(work);
worker->current_color = get_work_color(work_data);
......@@ -2600,12 +3217,16 @@ __acquires(&pool->lock)
* PENDING and queued state changes happen together while IRQ is
* disabled.
*/
set_work_pool_and_clear_pending(work, pool->id);
set_work_pool_and_clear_pending(work, pool->id, 0);
pwq->stats[PWQ_STAT_STARTED]++;
raw_spin_unlock_irq(&pool->lock);
lock_map_acquire(&pwq->wq->lockdep_map);
rcu_start_depth = rcu_preempt_depth();
lockdep_start_depth = lockdep_depth(current);
/* see drain_dead_softirq_workfn() */
if (!bh_draining)
lock_map_acquire(&pwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
/*
* Strictly speaking we should mark the invariant state without holding
......@@ -2638,12 +3259,17 @@ __acquires(&pool->lock)
trace_workqueue_execute_end(work, worker->current_func);
pwq->stats[PWQ_STAT_COMPLETED]++;
lock_map_release(&lockdep_map);
lock_map_release(&pwq->wq->lockdep_map);
if (!bh_draining)
lock_map_release(&pwq->wq->lockdep_map);
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
" last function: %ps\n",
current->comm, preempt_count(), task_pid_nr(current),
if (unlikely((worker->task && in_atomic()) ||
lockdep_depth(current) != lockdep_start_depth ||
rcu_preempt_depth() != rcu_start_depth)) {
pr_err("BUG: workqueue leaked atomic, lock or RCU: %s[%d]\n"
" preempt=0x%08x lock=%d->%d RCU=%d->%d workfn=%ps\n",
current->comm, task_pid_nr(current), preempt_count(),
lockdep_start_depth, lockdep_depth(current),
rcu_start_depth, rcu_preempt_depth(),
worker->current_func);
debug_show_held_locks(current);
dump_stack();
......@@ -2657,7 +3283,8 @@ __acquires(&pool->lock)
* stop_machine. At the same time, report a quiescent RCU state so
* the same condition doesn't freeze RCU.
*/
cond_resched();
if (worker->task)
cond_resched();
raw_spin_lock_irq(&pool->lock);
......@@ -2677,6 +3304,8 @@ __acquires(&pool->lock)
worker->current_func = NULL;
worker->current_pwq = NULL;
worker->current_color = INT_MAX;
/* must be the last step, see the function comment */
pwq_dec_nr_in_flight(pwq, work_data);
}
......@@ -2938,6 +3567,139 @@ static int rescuer_thread(void *__rescuer)
goto repeat;
}
static void bh_worker(struct worker *worker)
{
struct worker_pool *pool = worker->pool;
int nr_restarts = BH_WORKER_RESTARTS;
unsigned long end = jiffies + BH_WORKER_JIFFIES;
raw_spin_lock_irq(&pool->lock);
worker_leave_idle(worker);
/*
* This function follows the structure of worker_thread(). See there for
* explanations on each step.
*/
if (!need_more_worker(pool))
goto done;
WARN_ON_ONCE(!list_empty(&worker->scheduled));
worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
do {
struct work_struct *work =
list_first_entry(&pool->worklist,
struct work_struct, entry);
if (assign_work(work, worker, NULL))
process_scheduled_works(worker);
} while (keep_working(pool) &&
--nr_restarts && time_before(jiffies, end));
worker_set_flags(worker, WORKER_PREP);
done:
worker_enter_idle(worker);
kick_pool(pool);
raw_spin_unlock_irq(&pool->lock);
}
/*
* TODO: Convert all tasklet users to workqueue and use softirq directly.
*
* This is currently called from tasklet[_hi]action() and thus is also called
* whenever there are tasklets to run. Let's do an early exit if there's nothing
* queued. Once conversion from tasklet is complete, the need_more_worker() test
* can be dropped.
*
* After full conversion, we'll add worker->softirq_action, directly use the
* softirq action and obtain the worker pointer from the softirq_action pointer.
*/
void workqueue_softirq_action(bool highpri)
{
struct worker_pool *pool =
&per_cpu(bh_worker_pools, smp_processor_id())[highpri];
if (need_more_worker(pool))
bh_worker(list_first_entry(&pool->workers, struct worker, node));
}
struct wq_drain_dead_softirq_work {
struct work_struct work;
struct worker_pool *pool;
struct completion done;
};
static void drain_dead_softirq_workfn(struct work_struct *work)
{
struct wq_drain_dead_softirq_work *dead_work =
container_of(work, struct wq_drain_dead_softirq_work, work);
struct worker_pool *pool = dead_work->pool;
bool repeat;
/*
* @pool's CPU is dead and we want to execute its still pending work
* items from this BH work item which is running on a different CPU. As
* its CPU is dead, @pool can't be kicked and, as work execution path
* will be nested, a lockdep annotation needs to be suppressed. Mark
* @pool with %POOL_BH_DRAINING for the special treatments.
*/
raw_spin_lock_irq(&pool->lock);
pool->flags |= POOL_BH_DRAINING;
raw_spin_unlock_irq(&pool->lock);
bh_worker(list_first_entry(&pool->workers, struct worker, node));
raw_spin_lock_irq(&pool->lock);
pool->flags &= ~POOL_BH_DRAINING;
repeat = need_more_worker(pool);
raw_spin_unlock_irq(&pool->lock);
/*
* bh_worker() might hit consecutive execution limit and bail. If there
* still are pending work items, reschedule self and return so that we
* don't hog this CPU's BH.
*/
if (repeat) {
if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
queue_work(system_bh_highpri_wq, work);
else
queue_work(system_bh_wq, work);
} else {
complete(&dead_work->done);
}
}
/*
* @cpu is dead. Drain the remaining BH work items on the current CPU. It's
* possible to allocate dead_work per CPU and avoid flushing. However, then we
* have to worry about draining overlapping with CPU coming back online or
* nesting (one CPU's dead_work queued on another CPU which is also dead and so
* on). Let's keep it simple and drain them synchronously. These are BH work
* items which shouldn't be requeued on the same pool. Shouldn't take long.
*/
void workqueue_softirq_dead(unsigned int cpu)
{
int i;
for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
struct worker_pool *pool = &per_cpu(bh_worker_pools, cpu)[i];
struct wq_drain_dead_softirq_work dead_work;
if (!need_more_worker(pool))
continue;
INIT_WORK(&dead_work.work, drain_dead_softirq_workfn);
dead_work.pool = pool;
init_completion(&dead_work.done);
if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
queue_work(system_bh_highpri_wq, &dead_work.work);
else
queue_work(system_bh_wq, &dead_work.work);
wait_for_completion(&dead_work.done);
}
}
/**
* check_flush_dependency - check for flush dependency sanity
* @target_wq: workqueue being flushed
......@@ -3010,6 +3772,7 @@ static void insert_wq_barrier(struct pool_workqueue *pwq,
struct wq_barrier *barr,
struct work_struct *target, struct worker *worker)
{
static __maybe_unused struct lock_class_key bh_key, thr_key;
unsigned int work_flags = 0;
unsigned int work_color;
struct list_head *head;
......@@ -3019,15 +3782,20 @@ static void insert_wq_barrier(struct pool_workqueue *pwq,
* as we know for sure that this will not trigger any of the
* checks and call back into the fixup functions where we
* might deadlock.
*
* BH and threaded workqueues need separate lockdep keys to avoid
* spuriously triggering "inconsistent {SOFTIRQ-ON-W} -> {IN-SOFTIRQ-W}
* usage".
*/
INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
INIT_WORK_ONSTACK_KEY(&barr->work, wq_barrier_func,
(pwq->wq->flags & WQ_BH) ? &bh_key : &thr_key);
__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion_map(&barr->done, &target->lockdep_map);
barr->task = current;
/* The barrier work item does not participate in pwq->nr_active. */
/* The barrier work item does not participate in nr_active. */
work_flags |= WORK_STRUCT_INACTIVE;
/*
......@@ -3124,6 +3892,35 @@ static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
return wait;
}
static void touch_wq_lockdep_map(struct workqueue_struct *wq)
{
#ifdef CONFIG_LOCKDEP
if (wq->flags & WQ_BH)
local_bh_disable();
lock_map_acquire(&wq->lockdep_map);
lock_map_release(&wq->lockdep_map);
if (wq->flags & WQ_BH)
local_bh_enable();
#endif
}
static void touch_work_lockdep_map(struct work_struct *work,
struct workqueue_struct *wq)
{
#ifdef CONFIG_LOCKDEP
if (wq->flags & WQ_BH)
local_bh_disable();
lock_map_acquire(&work->lockdep_map);
lock_map_release(&work->lockdep_map);
if (wq->flags & WQ_BH)
local_bh_enable();
#endif
}
/**
* __flush_workqueue - ensure that any scheduled work has run to completion.
* @wq: workqueue to flush
......@@ -3143,8 +3940,7 @@ void __flush_workqueue(struct workqueue_struct *wq)
if (WARN_ON(!wq_online))
return;
lock_map_acquire(&wq->lockdep_map);
lock_map_release(&wq->lockdep_map);
touch_wq_lockdep_map(wq);
mutex_lock(&wq->mutex);
......@@ -3316,7 +4112,7 @@ void drain_workqueue(struct workqueue_struct *wq)
bool drained;
raw_spin_lock_irq(&pwq->pool->lock);
drained = !pwq->nr_active && list_empty(&pwq->inactive_works);
drained = pwq_is_empty(pwq);
raw_spin_unlock_irq(&pwq->pool->lock);
if (drained)
......@@ -3343,6 +4139,7 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
struct worker *worker = NULL;
struct worker_pool *pool;
struct pool_workqueue *pwq;
struct workqueue_struct *wq;
might_sleep();
......@@ -3366,11 +4163,14 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
pwq = worker->current_pwq;
}
check_flush_dependency(pwq->wq, work);
wq = pwq->wq;
check_flush_dependency(wq, work);
insert_wq_barrier(pwq, barr, work, worker);
raw_spin_unlock_irq(&pool->lock);
touch_work_lockdep_map(work, wq);
/*
* Force a lock recursion deadlock when using flush_work() inside a
* single-threaded or rescuer equipped workqueue.
......@@ -3380,11 +4180,9 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
* workqueues the deadlock happens when the rescuer stalls, blocking
* forward progress.
*/
if (!from_cancel &&
(pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)) {
lock_map_acquire(&pwq->wq->lockdep_map);
lock_map_release(&pwq->wq->lockdep_map);
}
if (!from_cancel && (wq->saved_max_active == 1 || wq->rescuer))
touch_wq_lockdep_map(wq);
rcu_read_unlock();
return true;
already_gone:
......@@ -3395,144 +4193,39 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
static bool __flush_work(struct work_struct *work, bool from_cancel)
{
struct wq_barrier barr;
if (WARN_ON(!wq_online))
return false;
if (WARN_ON(!work->func))
return false;
lock_map_acquire(&work->lockdep_map);
lock_map_release(&work->lockdep_map);
if (start_flush_work(work, &barr, from_cancel)) {
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
return true;
} else {
return false;
}
}
/**
* flush_work - wait for a work to finish executing the last queueing instance
* @work: the work to flush
*
* Wait until @work has finished execution. @work is guaranteed to be idle
* on return if it hasn't been requeued since flush started.
*
* Return:
* %true if flush_work() waited for the work to finish execution,
* %false if it was already idle.
*/
bool flush_work(struct work_struct *work)
{
return __flush_work(work, false);
}
EXPORT_SYMBOL_GPL(flush_work);
struct cwt_wait {
wait_queue_entry_t wait;
struct work_struct *work;
};
static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
if (cwait->work != key)
return 0;
return autoremove_wake_function(wait, mode, sync, key);
}
static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
{
static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);
unsigned long flags;
int ret;
do {
ret = try_to_grab_pending(work, is_dwork, &flags);
/*
* If someone else is already canceling, wait for it to
* finish. flush_work() doesn't work for PREEMPT_NONE
* because we may get scheduled between @work's completion
* and the other canceling task resuming and clearing
* CANCELING - flush_work() will return false immediately
* as @work is no longer busy, try_to_grab_pending() will
* return -ENOENT as @work is still being canceled and the
* other canceling task won't be able to clear CANCELING as
* we're hogging the CPU.
*
* Let's wait for completion using a waitqueue. As this
* may lead to the thundering herd problem, use a custom
* wake function which matches @work along with exclusive
* wait and wakeup.
*/
if (unlikely(ret == -ENOENT)) {
struct cwt_wait cwait;
init_wait(&cwait.wait);
cwait.wait.func = cwt_wakefn;
cwait.work = work;
prepare_to_wait_exclusive(&cancel_waitq, &cwait.wait,
TASK_UNINTERRUPTIBLE);
if (work_is_canceling(work))
schedule();
finish_wait(&cancel_waitq, &cwait.wait);
}
} while (unlikely(ret < 0));
/* tell other tasks trying to grab @work to back off */
mark_work_canceling(work);
local_irq_restore(flags);
/*
* This allows canceling during early boot. We know that @work
* isn't executing.
*/
if (wq_online)
__flush_work(work, true);
struct wq_barrier barr;
clear_work_data(work);
if (WARN_ON(!wq_online))
return false;
/*
* Paired with prepare_to_wait() above so that either
* waitqueue_active() is visible here or !work_is_canceling() is
* visible there.
*/
smp_mb();
if (waitqueue_active(&cancel_waitq))
__wake_up(&cancel_waitq, TASK_NORMAL, 1, work);
if (WARN_ON(!work->func))
return false;
return ret;
if (start_flush_work(work, &barr, from_cancel)) {
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
return true;
} else {
return false;
}
}
/**
* cancel_work_sync - cancel a work and wait for it to finish
* @work: the work to cancel
*
* Cancel @work and wait for its execution to finish. This function
* can be used even if the work re-queues itself or migrates to
* another workqueue. On return from this function, @work is
* guaranteed to be not pending or executing on any CPU.
*
* cancel_work_sync(&delayed_work->work) must not be used for
* delayed_work's. Use cancel_delayed_work_sync() instead.
* flush_work - wait for a work to finish executing the last queueing instance
* @work: the work to flush
*
* The caller must ensure that the workqueue on which @work was last
* queued can't be destroyed before this function returns.
* Wait until @work has finished execution. @work is guaranteed to be idle
* on return if it hasn't been requeued since flush started.
*
* Return:
* %true if @work was pending, %false otherwise.
* %true if flush_work() waited for the work to finish execution,
* %false if it was already idle.
*/
bool cancel_work_sync(struct work_struct *work)
bool flush_work(struct work_struct *work)
{
return __cancel_work_timer(work, false);
return __flush_work(work, false);
}
EXPORT_SYMBOL_GPL(cancel_work_sync);
EXPORT_SYMBOL_GPL(flush_work);
/**
* flush_delayed_work - wait for a dwork to finish executing the last queueing
......@@ -3576,20 +4269,50 @@ bool flush_rcu_work(struct rcu_work *rwork)
}
EXPORT_SYMBOL(flush_rcu_work);
static bool __cancel_work(struct work_struct *work, bool is_dwork)
static bool __cancel_work(struct work_struct *work, u32 cflags)
{
unsigned long flags;
unsigned long irq_flags;
int ret;
do {
ret = try_to_grab_pending(work, is_dwork, &flags);
ret = try_to_grab_pending(work, cflags, &irq_flags);
} while (unlikely(ret == -EAGAIN));
if (unlikely(ret < 0))
return false;
set_work_pool_and_clear_pending(work, get_work_pool_id(work));
local_irq_restore(flags);
set_work_pool_and_clear_pending(work, get_work_pool_id(work), 0);
local_irq_restore(irq_flags);
return ret;
}
static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
{
unsigned long irq_flags;
bool ret;
/* claim @work and tell other tasks trying to grab @work to back off */
ret = work_grab_pending(work, cflags, &irq_flags);
mark_work_canceling(work);
local_irq_restore(irq_flags);
/*
* Skip __flush_work() during early boot when we know that @work isn't
* executing. This allows canceling during early boot.
*/
if (wq_online)
__flush_work(work, true);
/*
* smp_mb() at the end of set_work_pool_and_clear_pending() is paired
* with prepare_to_wait() above so that either waitqueue_active() is
* visible here or !work_is_canceling() is visible there.
*/
set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE, 0);
if (waitqueue_active(&wq_cancel_waitq))
__wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
return ret;
}
......@@ -3598,10 +4321,34 @@ static bool __cancel_work(struct work_struct *work, bool is_dwork)
*/
bool cancel_work(struct work_struct *work)
{
return __cancel_work(work, false);
return __cancel_work(work, 0);
}
EXPORT_SYMBOL(cancel_work);
/**
* cancel_work_sync - cancel a work and wait for it to finish
* @work: the work to cancel
*
* Cancel @work and wait for its execution to finish. This function
* can be used even if the work re-queues itself or migrates to
* another workqueue. On return from this function, @work is
* guaranteed to be not pending or executing on any CPU.
*
* cancel_work_sync(&delayed_work->work) must not be used for
* delayed_work's. Use cancel_delayed_work_sync() instead.
*
* The caller must ensure that the workqueue on which @work was last
* queued can't be destroyed before this function returns.
*
* Return:
* %true if @work was pending, %false otherwise.
*/
bool cancel_work_sync(struct work_struct *work)
{
return __cancel_work_sync(work, 0);
}
EXPORT_SYMBOL_GPL(cancel_work_sync);
/**
* cancel_delayed_work - cancel a delayed work
* @dwork: delayed_work to cancel
......@@ -3620,7 +4367,7 @@ EXPORT_SYMBOL(cancel_work);
*/
bool cancel_delayed_work(struct delayed_work *dwork)
{
return __cancel_work(&dwork->work, true);
return __cancel_work(&dwork->work, WORK_CANCEL_DELAYED);
}
EXPORT_SYMBOL(cancel_delayed_work);
......@@ -3635,7 +4382,7 @@ EXPORT_SYMBOL(cancel_delayed_work);
*/
bool cancel_delayed_work_sync(struct delayed_work *dwork)
{
return __cancel_work_timer(&dwork->work, true);
return __cancel_work_sync(&dwork->work, WORK_CANCEL_DELAYED);
}
EXPORT_SYMBOL(cancel_delayed_work_sync);
......@@ -3927,11 +4674,66 @@ static void wq_free_lockdep(struct workqueue_struct *wq)
}
#endif
static void free_node_nr_active(struct wq_node_nr_active **nna_ar)
{
int node;
for_each_node(node) {
kfree(nna_ar[node]);
nna_ar[node] = NULL;
}
kfree(nna_ar[nr_node_ids]);
nna_ar[nr_node_ids] = NULL;
}
static void init_node_nr_active(struct wq_node_nr_active *nna)
{
nna->max = WQ_DFL_MIN_ACTIVE;
atomic_set(&nna->nr, 0);
raw_spin_lock_init(&nna->lock);
INIT_LIST_HEAD(&nna->pending_pwqs);
}
/*
* Each node's nr_active counter will be accessed mostly from its own node and
* should be allocated in the node.
*/
static int alloc_node_nr_active(struct wq_node_nr_active **nna_ar)
{
struct wq_node_nr_active *nna;
int node;
for_each_node(node) {
nna = kzalloc_node(sizeof(*nna), GFP_KERNEL, node);
if (!nna)
goto err_free;
init_node_nr_active(nna);
nna_ar[node] = nna;
}
/* [nr_node_ids] is used as the fallback */
nna = kzalloc_node(sizeof(*nna), GFP_KERNEL, NUMA_NO_NODE);
if (!nna)
goto err_free;
init_node_nr_active(nna);
nna_ar[nr_node_ids] = nna;
return 0;
err_free:
free_node_nr_active(nna_ar);
return -ENOMEM;
}
static void rcu_free_wq(struct rcu_head *rcu)
{
struct workqueue_struct *wq =
container_of(rcu, struct workqueue_struct, rcu);
if (wq->flags & WQ_UNBOUND)
free_node_nr_active(wq->node_nr_active);
wq_free_lockdep(wq);
free_percpu(wq->cpu_pwq);
free_workqueue_attrs(wq->unbound_attrs);
......@@ -4121,6 +4923,13 @@ static void pwq_release_workfn(struct kthread_work *work)
mutex_lock(&wq->mutex);
list_del_rcu(&pwq->pwqs_node);
is_last = list_empty(&wq->pwqs);
/*
* For ordered workqueue with a plugged dfl_pwq, restart it now.
*/
if (!is_last && (wq->flags & __WQ_ORDERED))
unplug_oldest_pwq(wq);
mutex_unlock(&wq->mutex);
}
......@@ -4130,6 +4939,15 @@ static void pwq_release_workfn(struct kthread_work *work)
mutex_unlock(&wq_pool_mutex);
}
if (!list_empty(&pwq->pending_node)) {
struct wq_node_nr_active *nna =
wq_node_nr_active(pwq->wq, pwq->pool->node);
raw_spin_lock_irq(&nna->lock);
list_del_init(&pwq->pending_node);
raw_spin_unlock_irq(&nna->lock);
}
call_rcu(&pwq->rcu, rcu_free_pwq);
/*
......@@ -4142,55 +4960,11 @@ static void pwq_release_workfn(struct kthread_work *work)
}
}
/**
* pwq_adjust_max_active - update a pwq's max_active to the current setting
* @pwq: target pool_workqueue
*
* If @pwq isn't freezing, set @pwq->max_active to the associated
* workqueue's saved_max_active and activate inactive work items
* accordingly. If @pwq is freezing, clear @pwq->max_active to zero.
*/
static void pwq_adjust_max_active(struct pool_workqueue *pwq)
{
struct workqueue_struct *wq = pwq->wq;
bool freezable = wq->flags & WQ_FREEZABLE;
unsigned long flags;
/* for @wq->saved_max_active */
lockdep_assert_held(&wq->mutex);
/* fast exit for non-freezable wqs */
if (!freezable && pwq->max_active == wq->saved_max_active)
return;
/* this function can be called during early boot w/ irq disabled */
raw_spin_lock_irqsave(&pwq->pool->lock, flags);
/*
* During [un]freezing, the caller is responsible for ensuring that
* this function is called at least once after @workqueue_freezing
* is updated and visible.
*/
if (!freezable || !workqueue_freezing) {
pwq->max_active = wq->saved_max_active;
while (!list_empty(&pwq->inactive_works) &&
pwq->nr_active < pwq->max_active)
pwq_activate_first_inactive(pwq);
kick_pool(pwq->pool);
} else {
pwq->max_active = 0;
}
raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
}
/* initialize newly allocated @pwq which is associated with @wq and @pool */
static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
struct worker_pool *pool)
{
BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
BUG_ON((unsigned long)pwq & ~WORK_STRUCT_PWQ_MASK);
memset(pwq, 0, sizeof(*pwq));
......@@ -4199,6 +4973,7 @@ static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
pwq->flush_color = -1;
pwq->refcnt = 1;
INIT_LIST_HEAD(&pwq->inactive_works);
INIT_LIST_HEAD(&pwq->pending_node);
INIT_LIST_HEAD(&pwq->pwqs_node);
INIT_LIST_HEAD(&pwq->mayday_node);
kthread_init_work(&pwq->release_work, pwq_release_workfn);
......@@ -4218,11 +4993,8 @@ static void link_pwq(struct pool_workqueue *pwq)
/* set the matching work_color */
pwq->work_color = wq->work_color;
/* sync max_active to the current setting */
pwq_adjust_max_active(pwq);
/* link in @pwq */
list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs);
}
/* obtain a pool matching @attr and create a pwq associating the pool and @wq */
......@@ -4289,10 +5061,11 @@ static void wq_calc_pod_cpumask(struct workqueue_attrs *attrs, int cpu,
"possible intersect\n");
}
/* install @pwq into @wq's cpu_pwq and return the old pwq */
/* install @pwq into @wq and return the old pwq, @cpu < 0 for dfl_pwq */
static struct pool_workqueue *install_unbound_pwq(struct workqueue_struct *wq,
int cpu, struct pool_workqueue *pwq)
{
struct pool_workqueue __rcu **slot = unbound_pwq_slot(wq, cpu);
struct pool_workqueue *old_pwq;
lockdep_assert_held(&wq_pool_mutex);
......@@ -4301,8 +5074,8 @@ static struct pool_workqueue *install_unbound_pwq(struct workqueue_struct *wq,
/* link_pwq() can handle duplicate calls */
link_pwq(pwq);
old_pwq = rcu_access_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu));
rcu_assign_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu), pwq);
old_pwq = rcu_access_pointer(*slot);
rcu_assign_pointer(*slot, pwq);
return old_pwq;
}
......@@ -4383,6 +5156,15 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
cpumask_copy(new_attrs->__pod_cpumask, new_attrs->cpumask);
ctx->attrs = new_attrs;
/*
* For initialized ordered workqueues, there should only be one pwq
* (dfl_pwq). Set the plugged flag of ctx->dfl_pwq to suspend execution
* of newly queued work items until execution of older work items in
* the old pwq's have completed.
*/
if ((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs))
ctx->dfl_pwq->plugged = true;
ctx->wq = wq;
return ctx;
......@@ -4402,14 +5184,19 @@ static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
/* save the previous pwq and install the new one */
/* save the previous pwqs and install the new ones */
for_each_possible_cpu(cpu)
ctx->pwq_tbl[cpu] = install_unbound_pwq(ctx->wq, cpu,
ctx->pwq_tbl[cpu]);
ctx->dfl_pwq = install_unbound_pwq(ctx->wq, -1, ctx->dfl_pwq);
/* update node_nr_active->max */
wq_update_node_max_active(ctx->wq, -1);
/* @dfl_pwq might not have been used, ensure it's linked */
link_pwq(ctx->dfl_pwq);
swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
/* rescuer needs to respect wq cpumask changes */
if (ctx->wq->rescuer)
set_cpus_allowed_ptr(ctx->wq->rescuer->task,
unbound_effective_cpumask(ctx->wq));
mutex_unlock(&ctx->wq->mutex);
}
......@@ -4423,14 +5210,6 @@ static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
return -EINVAL;
/* creating multiple pwqs breaks ordering guarantee */
if (!list_empty(&wq->pwqs)) {
if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
return -EINVAL;
wq->flags &= ~__WQ_ORDERED;
}
ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_cpumask);
if (IS_ERR(ctx))
return PTR_ERR(ctx);
......@@ -4519,9 +5298,7 @@ static void wq_update_pod(struct workqueue_struct *wq, int cpu,
/* nothing to do if the target cpumask matches the current pwq */
wq_calc_pod_cpumask(target_attrs, cpu, off_cpu);
pwq = rcu_dereference_protected(*per_cpu_ptr(wq->cpu_pwq, cpu),
lockdep_is_held(&wq_pool_mutex));
if (wqattrs_equal(target_attrs, pwq->pool->attrs))
if (wqattrs_equal(target_attrs, unbound_pwq(wq, cpu)->pool->attrs))
return;
/* create a new pwq */
......@@ -4539,10 +5316,11 @@ static void wq_update_pod(struct workqueue_struct *wq, int cpu,
use_dfl_pwq:
mutex_lock(&wq->mutex);
raw_spin_lock_irq(&wq->dfl_pwq->pool->lock);
get_pwq(wq->dfl_pwq);
raw_spin_unlock_irq(&wq->dfl_pwq->pool->lock);
old_pwq = install_unbound_pwq(wq, cpu, wq->dfl_pwq);
pwq = unbound_pwq(wq, -1);
raw_spin_lock_irq(&pwq->pool->lock);
get_pwq(pwq);
raw_spin_unlock_irq(&pwq->pool->lock);
old_pwq = install_unbound_pwq(wq, cpu, pwq);
out_unlock:
mutex_unlock(&wq->mutex);
put_pwq_unlocked(old_pwq);
......@@ -4559,10 +5337,17 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
if (!(wq->flags & WQ_UNBOUND)) {
for_each_possible_cpu(cpu) {
struct pool_workqueue **pwq_p =
per_cpu_ptr(wq->cpu_pwq, cpu);
struct worker_pool *pool =
&(per_cpu_ptr(cpu_worker_pools, cpu)[highpri]);
struct pool_workqueue **pwq_p;
struct worker_pool __percpu *pools;
struct worker_pool *pool;
if (wq->flags & WQ_BH)
pools = bh_worker_pools;
else
pools = cpu_worker_pools;
pool = &(per_cpu_ptr(pools, cpu)[highpri]);
pwq_p = per_cpu_ptr(wq->cpu_pwq, cpu);
*pwq_p = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL,
pool->node);
......@@ -4580,10 +5365,13 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
cpus_read_lock();
if (wq->flags & __WQ_ORDERED) {
struct pool_workqueue *dfl_pwq;
ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
/* there should only be single pwq for ordering guarantee */
WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
dfl_pwq = rcu_access_pointer(wq->dfl_pwq);
WARN(!ret && (wq->pwqs.next != &dfl_pwq->pwqs_node ||
wq->pwqs.prev != &dfl_pwq->pwqs_node),
"ordering guarantee broken for workqueue %s\n", wq->name);
} else {
ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
......@@ -4652,12 +5440,78 @@ static int init_rescuer(struct workqueue_struct *wq)
}
wq->rescuer = rescuer;
kthread_bind_mask(rescuer->task, cpu_possible_mask);
if (wq->flags & WQ_UNBOUND)
kthread_bind_mask(rescuer->task, wq_unbound_cpumask);
else
kthread_bind_mask(rescuer->task, cpu_possible_mask);
wake_up_process(rescuer->task);
return 0;
}
/**
* wq_adjust_max_active - update a wq's max_active to the current setting
* @wq: target workqueue
*
* If @wq isn't freezing, set @wq->max_active to the saved_max_active and
* activate inactive work items accordingly. If @wq is freezing, clear
* @wq->max_active to zero.
*/
static void wq_adjust_max_active(struct workqueue_struct *wq)
{
bool activated;
int new_max, new_min;
lockdep_assert_held(&wq->mutex);
if ((wq->flags & WQ_FREEZABLE) && workqueue_freezing) {
new_max = 0;
new_min = 0;
} else {
new_max = wq->saved_max_active;
new_min = wq->saved_min_active;
}
if (wq->max_active == new_max && wq->min_active == new_min)
return;
/*
* Update @wq->max/min_active and then kick inactive work items if more
* active work items are allowed. This doesn't break work item ordering
* because new work items are always queued behind existing inactive
* work items if there are any.
*/
WRITE_ONCE(wq->max_active, new_max);
WRITE_ONCE(wq->min_active, new_min);
if (wq->flags & WQ_UNBOUND)
wq_update_node_max_active(wq, -1);
if (new_max == 0)
return;
/*
* Round-robin through pwq's activating the first inactive work item
* until max_active is filled.
*/
do {
struct pool_workqueue *pwq;
activated = false;
for_each_pwq(pwq, wq) {
unsigned long irq_flags;
/* can be called during early boot w/ irq disabled */
raw_spin_lock_irqsave(&pwq->pool->lock, irq_flags);
if (pwq_activate_first_inactive(pwq, true)) {
activated = true;
kick_pool(pwq->pool);
}
raw_spin_unlock_irqrestore(&pwq->pool->lock, irq_flags);
}
} while (activated);
}
__printf(1, 4)
struct workqueue_struct *alloc_workqueue(const char *fmt,
unsigned int flags,
......@@ -4665,23 +5519,27 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
{
va_list args;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
size_t wq_size;
int name_len;
/*
* Unbound && max_active == 1 used to imply ordered, which is no longer
* the case on many machines due to per-pod pools. While
* alloc_ordered_workqueue() is the right way to create an ordered
* workqueue, keep the previous behavior to avoid subtle breakages.
*/
if ((flags & WQ_UNBOUND) && max_active == 1)
flags |= __WQ_ORDERED;
if (flags & WQ_BH) {
if (WARN_ON_ONCE(flags & ~__WQ_BH_ALLOWS))
return NULL;
if (WARN_ON_ONCE(max_active))
return NULL;
}
/* see the comment above the definition of WQ_POWER_EFFICIENT */
if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
flags |= WQ_UNBOUND;
/* allocate wq and format name */
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (flags & WQ_UNBOUND)
wq_size = struct_size(wq, node_nr_active, nr_node_ids + 1);
else
wq_size = sizeof(*wq);
wq = kzalloc(wq_size, GFP_KERNEL);
if (!wq)
return NULL;
......@@ -4692,15 +5550,30 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
}
va_start(args, max_active);
vsnprintf(wq->name, sizeof(wq->name), fmt, args);
name_len = vsnprintf(wq->name, sizeof(wq->name), fmt, args);
va_end(args);
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active = wq_clamp_max_active(max_active, flags, wq->name);
if (name_len >= WQ_NAME_LEN)
pr_warn_once("workqueue: name exceeds WQ_NAME_LEN. Truncating to: %s\n",
wq->name);
if (flags & WQ_BH) {
/*
* BH workqueues always share a single execution context per CPU
* and don't impose any max_active limit.
*/
max_active = INT_MAX;
} else {
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active = wq_clamp_max_active(max_active, flags, wq->name);
}
/* init wq */
wq->flags = flags;
wq->saved_max_active = max_active;
wq->max_active = max_active;
wq->min_active = min(max_active, WQ_DFL_MIN_ACTIVE);
wq->saved_max_active = wq->max_active;
wq->saved_min_active = wq->min_active;
mutex_init(&wq->mutex);
atomic_set(&wq->nr_pwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->pwqs);
......@@ -4711,8 +5584,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
wq_init_lockdep(wq);
INIT_LIST_HEAD(&wq->list);
if (flags & WQ_UNBOUND) {
if (alloc_node_nr_active(wq->node_nr_active) < 0)
goto err_unreg_lockdep;
}
if (alloc_and_link_pwqs(wq) < 0)
goto err_unreg_lockdep;
goto err_free_node_nr_active;
if (wq_online && init_rescuer(wq) < 0)
goto err_destroy;
......@@ -4728,8 +5606,7 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
mutex_lock(&wq_pool_mutex);
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
wq_adjust_max_active(wq);
mutex_unlock(&wq->mutex);
list_add_tail_rcu(&wq->list, &workqueues);
......@@ -4738,6 +5615,9 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
return wq;
err_free_node_nr_active:
if (wq->flags & WQ_UNBOUND)
free_node_nr_active(wq->node_nr_active);
err_unreg_lockdep:
wq_unregister_lockdep(wq);
wq_free_lockdep(wq);
......@@ -4759,9 +5639,9 @@ static bool pwq_busy(struct pool_workqueue *pwq)
if (pwq->nr_in_flight[i])
return true;
if ((pwq != pwq->wq->dfl_pwq) && (pwq->refcnt > 1))
if ((pwq != rcu_access_pointer(pwq->wq->dfl_pwq)) && (pwq->refcnt > 1))
return true;
if (pwq->nr_active || !list_empty(&pwq->inactive_works))
if (!pwq_is_empty(pwq))
return true;
return false;
......@@ -4843,13 +5723,12 @@ void destroy_workqueue(struct workqueue_struct *wq)
rcu_read_lock();
for_each_possible_cpu(cpu) {
pwq = rcu_access_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu));
RCU_INIT_POINTER(*per_cpu_ptr(wq->cpu_pwq, cpu), NULL);
put_pwq_unlocked(pwq);
put_pwq_unlocked(unbound_pwq(wq, cpu));
RCU_INIT_POINTER(*unbound_pwq_slot(wq, cpu), NULL);
}
put_pwq_unlocked(wq->dfl_pwq);
wq->dfl_pwq = NULL;
put_pwq_unlocked(unbound_pwq(wq, -1));
RCU_INIT_POINTER(*unbound_pwq_slot(wq, -1), NULL);
rcu_read_unlock();
}
......@@ -4860,33 +5739,62 @@ EXPORT_SYMBOL_GPL(destroy_workqueue);
* @wq: target workqueue
* @max_active: new max_active value.
*
* Set max_active of @wq to @max_active.
* Set max_active of @wq to @max_active. See the alloc_workqueue() function
* comment.
*
* CONTEXT:
* Don't call from IRQ context.
*/
void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
{
struct pool_workqueue *pwq;
/* max_active doesn't mean anything for BH workqueues */
if (WARN_ON(wq->flags & WQ_BH))
return;
/* disallow meddling with max_active for ordered workqueues */
if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
if (WARN_ON(wq->flags & __WQ_ORDERED))
return;
max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
mutex_lock(&wq->mutex);
wq->flags &= ~__WQ_ORDERED;
wq->saved_max_active = max_active;
if (wq->flags & WQ_UNBOUND)
wq->saved_min_active = min(wq->saved_min_active, max_active);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
wq_adjust_max_active(wq);
mutex_unlock(&wq->mutex);
}
EXPORT_SYMBOL_GPL(workqueue_set_max_active);
/**
* workqueue_set_min_active - adjust min_active of an unbound workqueue
* @wq: target unbound workqueue
* @min_active: new min_active value
*
* Set min_active of an unbound workqueue. Unlike other types of workqueues, an
* unbound workqueue is not guaranteed to be able to process max_active
* interdependent work items. Instead, an unbound workqueue is guaranteed to be
* able to process min_active number of interdependent work items which is
* %WQ_DFL_MIN_ACTIVE by default.
*
* Use this function to adjust the min_active value between 0 and the current
* max_active.
*/
void workqueue_set_min_active(struct workqueue_struct *wq, int min_active)
{
/* min_active is only meaningful for non-ordered unbound workqueues */
if (WARN_ON((wq->flags & (WQ_BH | WQ_UNBOUND | __WQ_ORDERED)) !=
WQ_UNBOUND))
return;
mutex_lock(&wq->mutex);
wq->saved_min_active = clamp(min_active, 0, wq->saved_max_active);
wq_adjust_max_active(wq);
mutex_unlock(&wq->mutex);
}
/**
* current_work - retrieve %current task's work struct
*
......@@ -4972,7 +5880,7 @@ EXPORT_SYMBOL_GPL(workqueue_congested);
unsigned int work_busy(struct work_struct *work)
{
struct worker_pool *pool;
unsigned long flags;
unsigned long irq_flags;
unsigned int ret = 0;
if (work_pending(work))
......@@ -4981,10 +5889,10 @@ unsigned int work_busy(struct work_struct *work)
rcu_read_lock();
pool = get_work_pool(work);
if (pool) {
raw_spin_lock_irqsave(&pool->lock, flags);
raw_spin_lock_irqsave(&pool->lock, irq_flags);
if (find_worker_executing_work(pool, work))
ret |= WORK_BUSY_RUNNING;
raw_spin_unlock_irqrestore(&pool->lock, flags);
raw_spin_unlock_irqrestore(&pool->lock, irq_flags);
}
rcu_read_unlock();
......@@ -5069,7 +5977,24 @@ static void pr_cont_pool_info(struct worker_pool *pool)
pr_cont(" cpus=%*pbl", nr_cpumask_bits, pool->attrs->cpumask);
if (pool->node != NUMA_NO_NODE)
pr_cont(" node=%d", pool->node);
pr_cont(" flags=0x%x nice=%d", pool->flags, pool->attrs->nice);
pr_cont(" flags=0x%x", pool->flags);
if (pool->flags & POOL_BH)
pr_cont(" bh%s",
pool->attrs->nice == HIGHPRI_NICE_LEVEL ? "-hi" : "");
else
pr_cont(" nice=%d", pool->attrs->nice);
}
static void pr_cont_worker_id(struct worker *worker)
{
struct worker_pool *pool = worker->pool;
if (pool->flags & WQ_BH)
pr_cont("bh%s",
pool->attrs->nice == HIGHPRI_NICE_LEVEL ? "-hi" : "");
else
pr_cont("%d%s", task_pid_nr(worker->task),
worker->rescue_wq ? "(RESCUER)" : "");
}
struct pr_cont_work_struct {
......@@ -5128,8 +6053,8 @@ static void show_pwq(struct pool_workqueue *pwq)
pr_info(" pwq %d:", pool->id);
pr_cont_pool_info(pool);
pr_cont(" active=%d/%d refcnt=%d%s\n",
pwq->nr_active, pwq->max_active, pwq->refcnt,
pr_cont(" active=%d refcnt=%d%s\n",
pwq->nr_active, pwq->refcnt,
!list_empty(&pwq->mayday_node) ? " MAYDAY" : "");
hash_for_each(pool->busy_hash, bkt, worker, hentry) {
......@@ -5146,10 +6071,9 @@ static void show_pwq(struct pool_workqueue *pwq)
if (worker->current_pwq != pwq)
continue;
pr_cont("%s %d%s:%ps", comma ? "," : "",
task_pid_nr(worker->task),
worker->rescue_wq ? "(RESCUER)" : "",
worker->current_func);
pr_cont(" %s", comma ? "," : "");
pr_cont_worker_id(worker);
pr_cont(":%ps", worker->current_func);
list_for_each_entry(work, &worker->scheduled, entry)
pr_cont_work(false, work, &pcws);
pr_cont_work_flush(comma, (work_func_t)-1L, &pcws);
......@@ -5200,10 +6124,10 @@ void show_one_workqueue(struct workqueue_struct *wq)
{
struct pool_workqueue *pwq;
bool idle = true;
unsigned long flags;
unsigned long irq_flags;
for_each_pwq(pwq, wq) {
if (pwq->nr_active || !list_empty(&pwq->inactive_works)) {
if (!pwq_is_empty(pwq)) {
idle = false;
break;
}
......@@ -5214,8 +6138,8 @@ void show_one_workqueue(struct workqueue_struct *wq)
pr_info("workqueue %s: flags=0x%x\n", wq->name, wq->flags);
for_each_pwq(pwq, wq) {
raw_spin_lock_irqsave(&pwq->pool->lock, flags);
if (pwq->nr_active || !list_empty(&pwq->inactive_works)) {
raw_spin_lock_irqsave(&pwq->pool->lock, irq_flags);
if (!pwq_is_empty(pwq)) {
/*
* Defer printing to avoid deadlocks in console
* drivers that queue work while holding locks
......@@ -5225,7 +6149,7 @@ void show_one_workqueue(struct workqueue_struct *wq)
show_pwq(pwq);
printk_deferred_exit();
}
raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
raw_spin_unlock_irqrestore(&pwq->pool->lock, irq_flags);
/*
* We could be printing a lot from atomic context, e.g.
* sysrq-t -> show_all_workqueues(). Avoid triggering
......@@ -5244,10 +6168,10 @@ static void show_one_worker_pool(struct worker_pool *pool)
{
struct worker *worker;
bool first = true;
unsigned long flags;
unsigned long irq_flags;
unsigned long hung = 0;
raw_spin_lock_irqsave(&pool->lock, flags);
raw_spin_lock_irqsave(&pool->lock, irq_flags);
if (pool->nr_workers == pool->nr_idle)
goto next_pool;
......@@ -5268,14 +6192,14 @@ static void show_one_worker_pool(struct worker_pool *pool)
pr_cont(" manager: %d",
task_pid_nr(pool->manager->task));
list_for_each_entry(worker, &pool->idle_list, entry) {
pr_cont(" %s%d", first ? "idle: " : "",
task_pid_nr(worker->task));
pr_cont(" %s", first ? "idle: " : "");
pr_cont_worker_id(worker);
first = false;
}
pr_cont("\n");
printk_deferred_exit();
next_pool:
raw_spin_unlock_irqrestore(&pool->lock, flags);
raw_spin_unlock_irqrestore(&pool->lock, irq_flags);
/*
* We could be printing a lot from atomic context, e.g.
* sysrq-t -> show_all_workqueues(). Avoid triggering
......@@ -5542,13 +6466,15 @@ int workqueue_online_cpu(unsigned int cpu)
mutex_lock(&wq_pool_mutex);
for_each_pool(pool, pi) {
mutex_lock(&wq_pool_attach_mutex);
/* BH pools aren't affected by hotplug */
if (pool->flags & POOL_BH)
continue;
mutex_lock(&wq_pool_attach_mutex);
if (pool->cpu == cpu)
rebind_workers(pool);
else if (pool->cpu < 0)
restore_unbound_workers_cpumask(pool, cpu);
mutex_unlock(&wq_pool_attach_mutex);
}
......@@ -5562,6 +6488,10 @@ int workqueue_online_cpu(unsigned int cpu)
for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]])
wq_update_pod(wq, tcpu, cpu, true);
mutex_lock(&wq->mutex);
wq_update_node_max_active(wq, -1);
mutex_unlock(&wq->mutex);
}
}
......@@ -5590,6 +6520,10 @@ int workqueue_offline_cpu(unsigned int cpu)
for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]])
wq_update_pod(wq, tcpu, cpu, false);
mutex_lock(&wq->mutex);
wq_update_node_max_active(wq, cpu);
mutex_unlock(&wq->mutex);
}
}
mutex_unlock(&wq_pool_mutex);
......@@ -5677,7 +6611,6 @@ EXPORT_SYMBOL_GPL(work_on_cpu_safe_key);
void freeze_workqueues_begin(void)
{
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
......@@ -5686,8 +6619,7 @@ void freeze_workqueues_begin(void)
list_for_each_entry(wq, &workqueues, list) {
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
wq_adjust_max_active(wq);
mutex_unlock(&wq->mutex);
}
......@@ -5752,7 +6684,6 @@ bool freeze_workqueues_busy(void)
void thaw_workqueues(void)
{
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
......@@ -5764,8 +6695,7 @@ void thaw_workqueues(void)
/* restore max_active and repopulate worklist */
list_for_each_entry(wq, &workqueues, list) {
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
wq_adjust_max_active(wq);
mutex_unlock(&wq->mutex);
}
......@@ -5784,16 +6714,9 @@ static int workqueue_apply_unbound_cpumask(const cpumask_var_t unbound_cpumask)
lockdep_assert_held(&wq_pool_mutex);
list_for_each_entry(wq, &workqueues, list) {
if (!(wq->flags & WQ_UNBOUND))
if (!(wq->flags & WQ_UNBOUND) || (wq->flags & __WQ_DESTROYING))
continue;
/* creating multiple pwqs breaks ordering guarantee */
if (!list_empty(&wq->pwqs)) {
if (wq->flags & __WQ_ORDERED_EXPLICIT)
continue;
wq->flags &= ~__WQ_ORDERED;
}
ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs, unbound_cpumask);
if (IS_ERR(ctx)) {
ret = PTR_ERR(ctx);
......@@ -6307,11 +7230,10 @@ int workqueue_sysfs_register(struct workqueue_struct *wq)
int ret;
/*
* Adjusting max_active or creating new pwqs by applying
* attributes breaks ordering guarantee. Disallow exposing ordered
* workqueues.
* Adjusting max_active breaks ordering guarantee. Disallow exposing
* ordered workqueues.
*/
if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
if (WARN_ON(wq->flags & __WQ_ORDERED))
return -EINVAL;
wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
......@@ -6408,10 +7330,10 @@ static DEFINE_PER_CPU(unsigned long, wq_watchdog_touched_cpu) = INITIAL_JIFFIES;
static void show_cpu_pool_hog(struct worker_pool *pool)
{
struct worker *worker;
unsigned long flags;
unsigned long irq_flags;
int bkt;
raw_spin_lock_irqsave(&pool->lock, flags);
raw_spin_lock_irqsave(&pool->lock, irq_flags);
hash_for_each(pool->busy_hash, bkt, worker, hentry) {
if (task_is_running(worker->task)) {
......@@ -6429,7 +7351,7 @@ static void show_cpu_pool_hog(struct worker_pool *pool)
}
}
raw_spin_unlock_irqrestore(&pool->lock, flags);
raw_spin_unlock_irqrestore(&pool->lock, irq_flags);
}
static void show_cpu_pools_hogs(void)
......@@ -6501,7 +7423,7 @@ static void wq_watchdog_timer_fn(struct timer_list *unused)
/* did we stall? */
if (time_after(now, ts + thresh)) {
lockup_detected = true;
if (pool->cpu >= 0) {
if (pool->cpu >= 0 && !(pool->flags & POOL_BH)) {
pool->cpu_stall = true;
cpu_pool_stall = true;
}
......@@ -6584,6 +7506,16 @@ static inline void wq_watchdog_init(void) { }
#endif /* CONFIG_WQ_WATCHDOG */
static void bh_pool_kick_normal(struct irq_work *irq_work)
{
raise_softirq_irqoff(TASKLET_SOFTIRQ);
}
static void bh_pool_kick_highpri(struct irq_work *irq_work)
{
raise_softirq_irqoff(HI_SOFTIRQ);
}
static void __init restrict_unbound_cpumask(const char *name, const struct cpumask *mask)
{
if (!cpumask_intersects(wq_unbound_cpumask, mask)) {
......@@ -6595,6 +7527,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma
cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
}
static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice)
{
BUG_ON(init_worker_pool(pool));
pool->cpu = cpu;
cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
pool->attrs->nice = nice;
pool->attrs->affn_strict = true;
pool->node = cpu_to_node(cpu);
/* alloc pool ID */
mutex_lock(&wq_pool_mutex);
BUG_ON(worker_pool_assign_id(pool));
mutex_unlock(&wq_pool_mutex);
}
/**
* workqueue_init_early - early init for workqueue subsystem
*
......@@ -6609,6 +7557,8 @@ void __init workqueue_init_early(void)
{
struct wq_pod_type *pt = &wq_pod_types[WQ_AFFN_SYSTEM];
int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
void (*irq_work_fns[2])(struct irq_work *) = { bh_pool_kick_normal,
bh_pool_kick_highpri };
int i, cpu;
BUILD_BUG_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
......@@ -6630,6 +7580,13 @@ void __init workqueue_init_early(void)
wq_update_pod_attrs_buf = alloc_workqueue_attrs();
BUG_ON(!wq_update_pod_attrs_buf);
/*
* If nohz_full is enabled, set power efficient workqueue as unbound.
* This allows workqueue items to be moved to HK CPUs.
*/
if (housekeeping_enabled(HK_TYPE_TICK))
wq_power_efficient = true;
/* initialize WQ_AFFN_SYSTEM pods */
pt->pod_cpus = kcalloc(1, sizeof(pt->pod_cpus[0]), GFP_KERNEL);
pt->pod_node = kcalloc(1, sizeof(pt->pod_node[0]), GFP_KERNEL);
......@@ -6643,25 +7600,21 @@ void __init workqueue_init_early(void)
pt->pod_node[0] = NUMA_NO_NODE;
pt->cpu_pod[0] = 0;
/* initialize CPU pools */
/* initialize BH and CPU pools */
for_each_possible_cpu(cpu) {
struct worker_pool *pool;
i = 0;
for_each_cpu_worker_pool(pool, cpu) {
BUG_ON(init_worker_pool(pool));
pool->cpu = cpu;
cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
pool->attrs->nice = std_nice[i++];
pool->attrs->affn_strict = true;
pool->node = cpu_to_node(cpu);
/* alloc pool ID */
mutex_lock(&wq_pool_mutex);
BUG_ON(worker_pool_assign_id(pool));
mutex_unlock(&wq_pool_mutex);
for_each_bh_worker_pool(pool, cpu) {
init_cpu_worker_pool(pool, cpu, std_nice[i]);
pool->flags |= POOL_BH;
init_irq_work(bh_pool_irq_work(pool), irq_work_fns[i]);
i++;
}
i = 0;
for_each_cpu_worker_pool(pool, cpu)
init_cpu_worker_pool(pool, cpu, std_nice[i++]);
}
/* create default unbound and ordered wq attrs */
......@@ -6691,13 +7644,17 @@ void __init workqueue_init_early(void)
WQ_FREEZABLE, 0);
system_power_efficient_wq = alloc_workqueue("events_power_efficient",
WQ_POWER_EFFICIENT, 0);
system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient",
WQ_FREEZABLE | WQ_POWER_EFFICIENT,
0);
system_bh_wq = alloc_workqueue("events_bh", WQ_BH, 0);
system_bh_highpri_wq = alloc_workqueue("events_bh_highpri",
WQ_BH | WQ_HIGHPRI, 0);
BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
!system_unbound_wq || !system_freezable_wq ||
!system_power_efficient_wq ||
!system_freezable_power_efficient_wq);
!system_freezable_power_efficient_wq ||
!system_bh_wq || !system_bh_highpri_wq);
}
static void __init wq_cpu_intensive_thresh_init(void)
......@@ -6763,9 +7720,10 @@ void __init workqueue_init(void)
* up. Also, create a rescuer for workqueues that requested it.
*/
for_each_possible_cpu(cpu) {
for_each_cpu_worker_pool(pool, cpu) {
for_each_bh_worker_pool(pool, cpu)
pool->node = cpu_to_node(cpu);
for_each_cpu_worker_pool(pool, cpu)
pool->node = cpu_to_node(cpu);
}
}
list_for_each_entry(wq, &workqueues, list) {
......@@ -6776,7 +7734,16 @@ void __init workqueue_init(void)
mutex_unlock(&wq_pool_mutex);
/* create the initial workers */
/*
* Create the initial workers. A BH pool has one pseudo worker that
* represents the shared BH execution context and thus doesn't get
* affected by hotplug events. Create the BH pseudo workers for all
* possible CPUs here.
*/
for_each_possible_cpu(cpu)
for_each_bh_worker_pool(pool, cpu)
BUG_ON(!create_worker(pool));
for_each_online_cpu(cpu) {
for_each_cpu_worker_pool(pool, cpu) {
pool->flags &= ~POOL_DISASSOCIATED;
......@@ -6856,7 +7823,7 @@ static bool __init cpus_share_numa(int cpu0, int cpu1)
/**
* workqueue_init_topology - initialize CPU pods for unbound workqueues
*
* This is the third step of there-staged workqueue subsystem initialization and
* This is the third step of three-staged workqueue subsystem initialization and
* invoked after SMP and topology information are fully initialized. It
* initializes the unbound CPU pods accordingly.
*/
......@@ -6870,6 +7837,8 @@ void __init workqueue_init_topology(void)
init_pod_type(&wq_pod_types[WQ_AFFN_CACHE], cpus_share_cache);
init_pod_type(&wq_pod_types[WQ_AFFN_NUMA], cpus_share_numa);
wq_topo_initialized = true;
mutex_lock(&wq_pool_mutex);
/*
......@@ -6878,8 +7847,12 @@ void __init workqueue_init_topology(void)
* combinations to apply per-pod sharing.
*/
list_for_each_entry(wq, &workqueues, list) {
for_each_online_cpu(cpu) {
for_each_online_cpu(cpu)
wq_update_pod(wq, cpu, cpu, true);
if (wq->flags & WQ_UNBOUND) {
mutex_lock(&wq->mutex);
wq_update_node_max_active(wq, -1);
mutex_unlock(&wq->mutex);
}
}
......
......@@ -199,7 +199,11 @@ pub fn enqueue<W, const ID: u64>(&self, w: W) -> W::EnqueueOutput
// stay valid until we call the function pointer in the `work_struct`, so the access is ok.
unsafe {
w.__enqueue(move |work_ptr| {
bindings::queue_work_on(bindings::WORK_CPU_UNBOUND as _, queue_ptr, work_ptr)
bindings::queue_work_on(
bindings::wq_misc_consts_WORK_CPU_UNBOUND as _,
queue_ptr,
work_ptr,
)
})
}
}
......
......@@ -50,6 +50,7 @@ import drgn
from drgn.helpers.linux.list import list_for_each_entry,list_empty
from drgn.helpers.linux.percpu import per_cpu_ptr
from drgn.helpers.linux.cpumask import for_each_cpu,for_each_possible_cpu
from drgn.helpers.linux.nodemask import for_each_node
from drgn.helpers.linux.idr import idr_for_each
import argparse
......@@ -75,6 +76,22 @@ def cpumask_str(cpumask):
output += f'{v:08x}'
return output.strip()
wq_type_len = 9
def wq_type_str(wq):
if wq.flags & WQ_BH:
return f'{"bh":{wq_type_len}}'
elif wq.flags & WQ_UNBOUND:
if wq.flags & WQ_ORDERED:
return f'{"ordered":{wq_type_len}}'
else:
if wq.unbound_attrs.affn_strict:
return f'{"unbound,S":{wq_type_len}}'
else:
return f'{"unbound":{wq_type_len}}'
else:
return f'{"percpu":{wq_type_len}}'
worker_pool_idr = prog['worker_pool_idr']
workqueues = prog['workqueues']
wq_unbound_cpumask = prog['wq_unbound_cpumask']
......@@ -82,6 +99,7 @@ wq_pod_types = prog['wq_pod_types']
wq_affn_dfl = prog['wq_affn_dfl']
wq_affn_names = prog['wq_affn_names']
WQ_BH = prog['WQ_BH']
WQ_UNBOUND = prog['WQ_UNBOUND']
WQ_ORDERED = prog['__WQ_ORDERED']
WQ_MEM_RECLAIM = prog['WQ_MEM_RECLAIM']
......@@ -92,6 +110,11 @@ WQ_AFFN_CACHE = prog['WQ_AFFN_CACHE']
WQ_AFFN_NUMA = prog['WQ_AFFN_NUMA']
WQ_AFFN_SYSTEM = prog['WQ_AFFN_SYSTEM']
POOL_BH = prog['POOL_BH']
WQ_NAME_LEN = prog['WQ_NAME_LEN'].value_()
cpumask_str_len = len(cpumask_str(wq_unbound_cpumask))
print('Affinity Scopes')
print('===============')
......@@ -133,10 +156,12 @@ for pi, pool in idr_for_each(worker_pool_idr):
for pi, pool in idr_for_each(worker_pool_idr):
pool = drgn.Object(prog, 'struct worker_pool', address=pool)
print(f'pool[{pi:0{max_pool_id_len}}] ref={pool.refcnt.value_():{max_ref_len}} nice={pool.attrs.nice.value_():3} ', end='')
print(f'pool[{pi:0{max_pool_id_len}}] flags=0x{pool.flags.value_():02x} ref={pool.refcnt.value_():{max_ref_len}} nice={pool.attrs.nice.value_():3} ', end='')
print(f'idle/workers={pool.nr_idle.value_():3}/{pool.nr_workers.value_():3} ', end='')
if pool.cpu >= 0:
print(f'cpu={pool.cpu.value_():3}', end='')
if pool.flags & POOL_BH:
print(' bh', end='')
else:
print(f'cpus={cpumask_str(pool.attrs.cpumask)}', end='')
print(f' pod_cpus={cpumask_str(pool.attrs.__pod_cpumask)}', end='')
......@@ -148,24 +173,13 @@ print('')
print('Workqueue CPU -> pool')
print('=====================')
print('[ workqueue \ type CPU', end='')
print(f'[{"workqueue":^{WQ_NAME_LEN-2}}\\ {"type CPU":{wq_type_len}}', end='')
for cpu in for_each_possible_cpu(prog):
print(f' {cpu:{max_pool_id_len}}', end='')
print(' dfl]')
for wq in list_for_each_entry('struct workqueue_struct', workqueues.address_of_(), 'list'):
print(f'{wq.name.string_().decode()[-24:]:24}', end='')
if wq.flags & WQ_UNBOUND:
if wq.flags & WQ_ORDERED:
print(' ordered ', end='')
else:
print(' unbound', end='')
if wq.unbound_attrs.affn_strict:
print(',S ', end='')
else:
print(' ', end='')
else:
print(' percpu ', end='')
print(f'{wq.name.string_().decode():{WQ_NAME_LEN}} {wq_type_str(wq):10}', end='')
for cpu in for_each_possible_cpu(prog):
pool_id = per_cpu_ptr(wq.cpu_pwq, cpu)[0].pool.id.value_()
......@@ -175,3 +189,65 @@ for wq in list_for_each_entry('struct workqueue_struct', workqueues.address_of_(
if wq.flags & WQ_UNBOUND:
print(f' {wq.dfl_pwq.pool.id.value_():{max_pool_id_len}}', end='')
print('')
print('')
print('Workqueue -> rescuer')
print('====================')
ucpus_len = max(cpumask_str_len, len("unbound_cpus"))
rcpus_len = max(cpumask_str_len, len("rescuer_cpus"))
print(f'[{"workqueue":^{WQ_NAME_LEN-2}}\\ {"unbound_cpus":{ucpus_len}} pid {"rescuer_cpus":{rcpus_len}} ]')
for wq in list_for_each_entry('struct workqueue_struct', workqueues.address_of_(), 'list'):
if not (wq.flags & WQ_MEM_RECLAIM):
continue
print(f'{wq.name.string_().decode():{WQ_NAME_LEN}}', end='')
if wq.unbound_attrs.value_() != 0:
print(f' {cpumask_str(wq.unbound_attrs.cpumask):{ucpus_len}}', end='')
else:
print(f' {"":{ucpus_len}}', end='')
print(f' {wq.rescuer.task.pid.value_():6}', end='')
print(f' {cpumask_str(wq.rescuer.task.cpus_ptr):{rcpus_len}}', end='')
print('')
print('')
print('Unbound workqueue -> node_nr/max_active')
print('=======================================')
if 'node_to_cpumask_map' in prog:
__cpu_online_mask = prog['__cpu_online_mask']
node_to_cpumask_map = prog['node_to_cpumask_map']
nr_node_ids = prog['nr_node_ids'].value_()
print(f'online_cpus={cpumask_str(__cpu_online_mask.address_of_())}')
for node in for_each_node():
print(f'NODE[{node:02}]={cpumask_str(node_to_cpumask_map[node])}')
print('')
print(f'[{"workqueue":^{WQ_NAME_LEN-2}}\\ min max', end='')
first = True
for node in for_each_node():
if first:
print(f' NODE {node}', end='')
first = False
else:
print(f' {node:7}', end='')
print(f' {"dfl":>7} ]')
print('')
for wq in list_for_each_entry('struct workqueue_struct', workqueues.address_of_(), 'list'):
if not (wq.flags & WQ_UNBOUND):
continue
print(f'{wq.name.string_().decode():{WQ_NAME_LEN}} ', end='')
print(f'{wq.min_active.value_():3} {wq.max_active.value_():3}', end='')
for node in for_each_node():
nna = wq.node_nr_active[node]
print(f' {nna.nr.counter.value_():3}/{nna.max.value_():3}', end='')
nna = wq.node_nr_active[nr_node_ids]
print(f' {nna.nr.counter.value_():3}/{nna.max.value_():3}')
else:
printf(f'node_to_cpumask_map not present, is NUMA enabled?')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment