Commit 873eaca6 authored by Tejun Heo's avatar Tejun Heo

workqueue: Factor out work to worker assignment and collision handling

The two work execution paths in worker_thread() and rescuer_thread() use
move_linked_works() to claim work items from @pool->worklist. Once claimed,
process_schedule_works() is called which invokes process_one_work() on each
work item. process_one_work() then uses find_worker_executing_work() to
detect and handle collisions - situations where the work item to be executed
is still running on another worker.

This works fine, but, to improve work execution locality, we want to
establish work to worker association earlier and know for sure that the
worker is going to excute the work once asssigned, which requires performing
collision handling earlier while trying to assign the work item to the
worker.

This patch introduces assign_work() which assigns a work item to a worker
using move_linked_works() and then performs collision handling. As collision
handling is handled earlier, process_one_work() no longer needs to worry
about them.

After the this patch, collision checks for linked work items are skipped,
which should be fine as they can't be queued multiple times concurrently.
For work items running from rescuers, the timing of collision handling may
change but the invariant that the work items go through collision handling
before starting execution does not.

This patch shouldn't cause noticeable behavior changes, especially given
that worker_thread() behavior remains the same.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
parent 63c5484e
...@@ -1025,13 +1025,10 @@ static struct worker *find_worker_executing_work(struct worker_pool *pool, ...@@ -1025,13 +1025,10 @@ static struct worker *find_worker_executing_work(struct worker_pool *pool,
* @head: target list to append @work to * @head: target list to append @work to
* @nextp: out parameter for nested worklist walking * @nextp: out parameter for nested worklist walking
* *
* Schedule linked works starting from @work to @head. Work series to * Schedule linked works starting from @work to @head. Work series to be
* be scheduled starts at @work and includes any consecutive work with * scheduled starts at @work and includes any consecutive work with
* WORK_STRUCT_LINKED set in its predecessor. * WORK_STRUCT_LINKED set in its predecessor. See assign_work() for details on
* * @nextp.
* If @nextp is not NULL, it's updated to point to the next work of
* the last scheduled work. This allows move_linked_works() to be
* nested inside outer list_for_each_entry_safe().
* *
* CONTEXT: * CONTEXT:
* raw_spin_lock_irq(pool->lock). * raw_spin_lock_irq(pool->lock).
...@@ -1060,6 +1057,48 @@ static void move_linked_works(struct work_struct *work, struct list_head *head, ...@@ -1060,6 +1057,48 @@ static void move_linked_works(struct work_struct *work, struct list_head *head,
*nextp = n; *nextp = n;
} }
/**
* assign_work - assign a work item and its linked work items to a worker
* @work: work to assign
* @worker: worker to assign to
* @nextp: out parameter for nested worklist walking
*
* Assign @work and its linked work items to @worker. If @work is already being
* executed by another worker in the same pool, it'll be punted there.
*
* If @nextp is not NULL, it's updated to point to the next work of the last
* scheduled work. This allows assign_work() to be nested inside
* list_for_each_entry_safe().
*
* Returns %true if @work was successfully assigned to @worker. %false if @work
* was punted to another worker already executing it.
*/
static bool assign_work(struct work_struct *work, struct worker *worker,
struct work_struct **nextp)
{
struct worker_pool *pool = worker->pool;
struct worker *collision;
lockdep_assert_held(&pool->lock);
/*
* A single work shouldn't be executed concurrently by multiple workers.
* __queue_work() ensures that @work doesn't jump to a different pool
* while still running in the previous pool. Here, we should ensure that
* @work is not executed concurrently by multiple workers from the same
* pool. Check whether anyone is already processing the work. If so,
* defer the work to the currently executing one.
*/
collision = find_worker_executing_work(pool, work);
if (unlikely(collision)) {
move_linked_works(work, &collision->scheduled, nextp);
return false;
}
move_linked_works(work, &worker->scheduled, nextp);
return true;
}
/** /**
* wake_up_worker - wake up an idle worker * wake_up_worker - wake up an idle worker
* @pool: worker pool to wake worker from * @pool: worker pool to wake worker from
...@@ -2462,7 +2501,6 @@ __acquires(&pool->lock) ...@@ -2462,7 +2501,6 @@ __acquires(&pool->lock)
struct pool_workqueue *pwq = get_work_pwq(work); struct pool_workqueue *pwq = get_work_pwq(work);
struct worker_pool *pool = worker->pool; struct worker_pool *pool = worker->pool;
unsigned long work_data; unsigned long work_data;
struct worker *collision;
#ifdef CONFIG_LOCKDEP #ifdef CONFIG_LOCKDEP
/* /*
* It is permissible to free the struct work_struct from * It is permissible to free the struct work_struct from
...@@ -2479,18 +2517,6 @@ __acquires(&pool->lock) ...@@ -2479,18 +2517,6 @@ __acquires(&pool->lock)
WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) && WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
raw_smp_processor_id() != pool->cpu); raw_smp_processor_id() != pool->cpu);
/*
* A single work shouldn't be executed concurrently by
* multiple workers on a single cpu. Check whether anyone is
* already processing the work. If so, defer the work to the
* currently executing one.
*/
collision = find_worker_executing_work(pool, work);
if (unlikely(collision)) {
move_linked_works(work, &collision->scheduled, NULL);
return;
}
/* claim and dequeue */ /* claim and dequeue */
debug_work_deactivate(work); debug_work_deactivate(work);
hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work); hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
...@@ -2717,8 +2743,8 @@ static int worker_thread(void *__worker) ...@@ -2717,8 +2743,8 @@ static int worker_thread(void *__worker)
list_first_entry(&pool->worklist, list_first_entry(&pool->worklist,
struct work_struct, entry); struct work_struct, entry);
move_linked_works(work, &worker->scheduled, NULL); if (assign_work(work, worker, NULL))
process_scheduled_works(worker); process_scheduled_works(worker);
} while (keep_working(pool)); } while (keep_working(pool));
worker_set_flags(worker, WORKER_PREP); worker_set_flags(worker, WORKER_PREP);
...@@ -2762,7 +2788,6 @@ static int rescuer_thread(void *__rescuer) ...@@ -2762,7 +2788,6 @@ static int rescuer_thread(void *__rescuer)
{ {
struct worker *rescuer = __rescuer; struct worker *rescuer = __rescuer;
struct workqueue_struct *wq = rescuer->rescue_wq; struct workqueue_struct *wq = rescuer->rescue_wq;
struct list_head *scheduled = &rescuer->scheduled;
bool should_stop; bool should_stop;
set_user_nice(current, RESCUER_NICE_LEVEL); set_user_nice(current, RESCUER_NICE_LEVEL);
...@@ -2807,15 +2832,14 @@ static int rescuer_thread(void *__rescuer) ...@@ -2807,15 +2832,14 @@ static int rescuer_thread(void *__rescuer)
* Slurp in all works issued via this workqueue and * Slurp in all works issued via this workqueue and
* process'em. * process'em.
*/ */
WARN_ON_ONCE(!list_empty(scheduled)); WARN_ON_ONCE(!list_empty(&rescuer->scheduled));
list_for_each_entry_safe(work, n, &pool->worklist, entry) { list_for_each_entry_safe(work, n, &pool->worklist, entry) {
if (get_work_pwq(work) == pwq) { if (get_work_pwq(work) == pwq &&
move_linked_works(work, scheduled, &n); assign_work(work, rescuer, &n))
pwq->stats[PWQ_STAT_RESCUED]++; pwq->stats[PWQ_STAT_RESCUED]++;
}
} }
if (!list_empty(scheduled)) { if (!list_empty(&rescuer->scheduled)) {
process_scheduled_works(rescuer); process_scheduled_works(rescuer);
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment