Commit 4fb6ac32 authored by Jens Axboe's avatar Jens Axboe

io-wq: improve manager/worker handling over exec

exec will cancel any threads, including the ones that io-wq is using. This
isn't a problem, in fact we'd prefer it to be that way since it means we
know that any async work cancels naturally without having to handle it
proactively.

But it does mean that we need to setup a new manager, as the manager and
workers are gone. Handle this at queue time, and cancel work if we fail.
Since the manager can go away without us noticing, ensure that the manager
itself holds a reference to the 'wq' as well. Rename io_wq_destroy() to
io_wq_put() to reflect that.

In the future we can now simplify exec cancelation handling, for now just
leave it the same.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent eb85890b
...@@ -189,8 +189,7 @@ static void io_worker_exit(struct io_worker *worker) ...@@ -189,8 +189,7 @@ static void io_worker_exit(struct io_worker *worker)
raw_spin_unlock_irq(&wqe->lock); raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu); kfree_rcu(worker, rcu);
if (refcount_dec_and_test(&wqe->wq->refs)) io_wq_put(wqe->wq);
complete(&wqe->wq->done);
} }
static inline bool io_wqe_run_queue(struct io_wqe *wqe) static inline bool io_wqe_run_queue(struct io_wqe *wqe)
...@@ -654,8 +653,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) ...@@ -654,8 +653,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
else else
pid = io_wq_fork_thread(task_thread_unbound, worker); pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) { if (pid < 0) {
if (refcount_dec_and_test(&wq->refs)) io_wq_put(wq);
complete(&wq->done);
kfree(worker); kfree(worker);
return false; return false;
} }
...@@ -754,11 +752,6 @@ static int io_wq_manager(void *data) ...@@ -754,11 +752,6 @@ static int io_wq_manager(void *data)
io_wq_check_workers(wq); io_wq_check_workers(wq);
if (refcount_dec_and_test(&wq->refs)) {
wq->manager = NULL;
complete(&wq->done);
do_exit(0);
}
/* if ERROR is set and we get here, we have workers to wake */ /* if ERROR is set and we get here, we have workers to wake */
if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
rcu_read_lock(); rcu_read_lock();
...@@ -767,6 +760,7 @@ static int io_wq_manager(void *data) ...@@ -767,6 +760,7 @@ static int io_wq_manager(void *data)
rcu_read_unlock(); rcu_read_unlock();
} }
wq->manager = NULL; wq->manager = NULL;
io_wq_put(wq);
do_exit(0); do_exit(0);
} }
...@@ -801,12 +795,40 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) ...@@ -801,12 +795,40 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
wq_list_add_after(&work->list, &tail->list, &wqe->work_list); wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
} }
static int io_wq_fork_manager(struct io_wq *wq)
{
int ret;
if (wq->manager)
return 0;
clear_bit(IO_WQ_BIT_EXIT, &wq->state);
refcount_inc(&wq->refs);
current->flags |= PF_IO_WORKER;
ret = io_wq_fork_thread(io_wq_manager, wq);
current->flags &= ~PF_IO_WORKER;
if (ret >= 0) {
wait_for_completion(&wq->done);
return 0;
}
io_wq_put(wq);
return ret;
}
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{ {
struct io_wqe_acct *acct = io_work_get_acct(wqe, work); struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
int work_flags; int work_flags;
unsigned long flags; unsigned long flags;
/* Can only happen if manager creation fails after exec */
if (unlikely(io_wq_fork_manager(wqe->wq))) {
work->flags |= IO_WQ_WORK_CANCEL;
wqe->wq->do_work(work);
return;
}
work_flags = work->flags; work_flags = work->flags;
raw_spin_lock_irqsave(&wqe->lock, flags); raw_spin_lock_irqsave(&wqe->lock, flags);
io_wqe_insert_work(wqe, work); io_wqe_insert_work(wqe, work);
...@@ -1034,16 +1056,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1034,16 +1056,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
init_completion(&wq->done); init_completion(&wq->done);
refcount_set(&wq->refs, 1); refcount_set(&wq->refs, 1);
current->flags |= PF_IO_WORKER; ret = io_wq_fork_manager(wq);
ret = io_wq_fork_thread(io_wq_manager, wq); if (!ret)
current->flags &= ~PF_IO_WORKER;
if (ret >= 0) {
wait_for_completion(&wq->done);
return wq; return wq;
}
if (refcount_dec_and_test(&wq->refs)) io_wq_put(wq);
complete(&wq->done);
io_wq_put_hash(data->hash); io_wq_put_hash(data->hash);
err: err:
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
...@@ -1056,7 +1073,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1056,7 +1073,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
return ERR_PTR(ret); return ERR_PTR(ret);
} }
void io_wq_destroy(struct io_wq *wq) static void io_wq_destroy(struct io_wq *wq)
{ {
int node; int node;
...@@ -1071,8 +1088,6 @@ void io_wq_destroy(struct io_wq *wq) ...@@ -1071,8 +1088,6 @@ void io_wq_destroy(struct io_wq *wq)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock(); rcu_read_unlock();
wait_for_completion(&wq->done);
spin_lock_irq(&wq->hash->wait.lock); spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node) { for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node]; struct io_wqe *wqe = wq->wqes[node];
...@@ -1084,6 +1099,13 @@ void io_wq_destroy(struct io_wq *wq) ...@@ -1084,6 +1099,13 @@ void io_wq_destroy(struct io_wq *wq)
io_wq_put_hash(wq->hash); io_wq_put_hash(wq->hash);
kfree(wq->wqes); kfree(wq->wqes);
kfree(wq); kfree(wq);
}
void io_wq_put(struct io_wq *wq)
{
if (refcount_dec_and_test(&wq->refs))
io_wq_destroy(wq);
} }
static bool io_wq_worker_affinity(struct io_worker *worker, void *data) static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
......
...@@ -113,7 +113,7 @@ struct io_wq_data { ...@@ -113,7 +113,7 @@ struct io_wq_data {
}; };
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data);
void io_wq_destroy(struct io_wq *wq); void io_wq_put(struct io_wq *wq);
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
void io_wq_hash_work(struct io_wq_work *work, void *val); void io_wq_hash_work(struct io_wq_work *work, void *val);
......
...@@ -2024,7 +2024,7 @@ static void __io_req_task_submit(struct io_kiocb *req) ...@@ -2024,7 +2024,7 @@ static void __io_req_task_submit(struct io_kiocb *req)
/* ctx stays valid until unlock, even if we drop all ours ctx->refs */ /* ctx stays valid until unlock, even if we drop all ours ctx->refs */
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
if (!ctx->sqo_dead && !(current->flags & PF_EXITING)) if (!ctx->sqo_dead && !(current->flags & PF_EXITING) && !current->in_execve)
__io_queue_sqe(req); __io_queue_sqe(req);
else else
__io_req_task_cancel(req, -EFAULT); __io_req_task_cancel(req, -EFAULT);
...@@ -8821,7 +8821,7 @@ void __io_uring_files_cancel(struct files_struct *files) ...@@ -8821,7 +8821,7 @@ void __io_uring_files_cancel(struct files_struct *files)
if (files) { if (files) {
io_uring_remove_task_files(tctx); io_uring_remove_task_files(tctx);
if (tctx->io_wq) { if (tctx->io_wq) {
io_wq_destroy(tctx->io_wq); io_wq_put(tctx->io_wq);
tctx->io_wq = NULL; tctx->io_wq = NULL;
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment