Commit 521d6a73 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io_uring: cancel sqpoll via task_work

1) The first problem is io_uring_cancel_sqpoll() ->
io_uring_cancel_task_requests() basically doing park(); park(); and so
hanging.

2) Another one is more subtle, when the master task is doing cancellations,
but SQPOLL task submits in-between the end of the cancellation but
before finish() requests taking a ref to the ctx, and so eternally
locking it up.

3) Yet another is a dying SQPOLL task doing io_uring_cancel_sqpoll() and
same io_uring_cancel_sqpoll() from the owner task, they race for
tctx->wait events. And there probably more of them.

Instead do SQPOLL cancellations from within SQPOLL task context via
task_work, see io_sqpoll_cancel_sync(). With that we don't need temporal
park()/unpark() during cancellation, which is ugly, subtle and anyway
doesn't allow to do io_run_task_work() properly.

io_uring_cancel_sqpoll() is called only from SQPOLL task context and
under sqd locking, so all parking is removed from there. And so,
io_sq_thread_[un]park() and io_sq_thread_stop() are not used now by
SQPOLL task, and that spare us from some headache.

Also remove ctx->sqd_list early to avoid 2). And kill tctx->sqpoll,
which is not used anymore.
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 26984fbf
...@@ -6665,6 +6665,7 @@ static int io_sq_thread(void *data) ...@@ -6665,6 +6665,7 @@ static int io_sq_thread(void *data)
up_read(&sqd->rw_lock); up_read(&sqd->rw_lock);
cond_resched(); cond_resched();
down_read(&sqd->rw_lock); down_read(&sqd->rw_lock);
io_run_task_work();
timeout = jiffies + sqd->sq_thread_idle; timeout = jiffies + sqd->sq_thread_idle;
continue; continue;
} }
...@@ -6720,18 +6721,22 @@ static int io_sq_thread(void *data) ...@@ -6720,18 +6721,22 @@ static int io_sq_thread(void *data)
finish_wait(&sqd->wait, &wait); finish_wait(&sqd->wait, &wait);
timeout = jiffies + sqd->sq_thread_idle; timeout = jiffies + sqd->sq_thread_idle;
} }
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
io_uring_cancel_sqpoll(ctx);
up_read(&sqd->rw_lock); up_read(&sqd->rw_lock);
down_write(&sqd->rw_lock);
/*
* someone may have parked and added a cancellation task_work, run
* it first because we don't want it in io_uring_cancel_sqpoll()
*/
io_run_task_work(); io_run_task_work();
down_write(&sqd->rw_lock); list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
io_uring_cancel_sqpoll(ctx);
sqd->thread = NULL; sqd->thread = NULL;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
io_ring_set_wakeup_flag(ctx); io_ring_set_wakeup_flag(ctx);
up_write(&sqd->rw_lock); up_write(&sqd->rw_lock);
io_run_task_work();
complete(&sqd->exited); complete(&sqd->exited);
do_exit(0); do_exit(0);
} }
...@@ -7033,8 +7038,8 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) ...@@ -7033,8 +7038,8 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
static void io_sq_thread_unpark(struct io_sq_data *sqd) static void io_sq_thread_unpark(struct io_sq_data *sqd)
__releases(&sqd->rw_lock) __releases(&sqd->rw_lock)
{ {
if (sqd->thread == current) WARN_ON_ONCE(sqd->thread == current);
return;
clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
up_write(&sqd->rw_lock); up_write(&sqd->rw_lock);
} }
...@@ -7042,8 +7047,8 @@ static void io_sq_thread_unpark(struct io_sq_data *sqd) ...@@ -7042,8 +7047,8 @@ static void io_sq_thread_unpark(struct io_sq_data *sqd)
static void io_sq_thread_park(struct io_sq_data *sqd) static void io_sq_thread_park(struct io_sq_data *sqd)
__acquires(&sqd->rw_lock) __acquires(&sqd->rw_lock)
{ {
if (sqd->thread == current) WARN_ON_ONCE(sqd->thread == current);
return;
set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
down_write(&sqd->rw_lock); down_write(&sqd->rw_lock);
/* set again for consistency, in case concurrent parks are happening */ /* set again for consistency, in case concurrent parks are happening */
...@@ -7054,8 +7059,8 @@ static void io_sq_thread_park(struct io_sq_data *sqd) ...@@ -7054,8 +7059,8 @@ static void io_sq_thread_park(struct io_sq_data *sqd)
static void io_sq_thread_stop(struct io_sq_data *sqd) static void io_sq_thread_stop(struct io_sq_data *sqd)
{ {
if (test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)) WARN_ON_ONCE(sqd->thread == current);
return;
down_write(&sqd->rw_lock); down_write(&sqd->rw_lock);
set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
if (sqd->thread) if (sqd->thread)
...@@ -7078,7 +7083,7 @@ static void io_sq_thread_finish(struct io_ring_ctx *ctx) ...@@ -7078,7 +7083,7 @@ static void io_sq_thread_finish(struct io_ring_ctx *ctx)
if (sqd) { if (sqd) {
io_sq_thread_park(sqd); io_sq_thread_park(sqd);
list_del(&ctx->sqd_list); list_del_init(&ctx->sqd_list);
io_sqd_update_thread_idle(sqd); io_sqd_update_thread_idle(sqd);
io_sq_thread_unpark(sqd); io_sq_thread_unpark(sqd);
...@@ -7760,7 +7765,6 @@ static int io_uring_alloc_task_context(struct task_struct *task, ...@@ -7760,7 +7765,6 @@ static int io_uring_alloc_task_context(struct task_struct *task,
init_waitqueue_head(&tctx->wait); init_waitqueue_head(&tctx->wait);
tctx->last = NULL; tctx->last = NULL;
atomic_set(&tctx->in_idle, 0); atomic_set(&tctx->in_idle, 0);
tctx->sqpoll = false;
task->io_uring = tctx; task->io_uring = tctx;
spin_lock_init(&tctx->task_lock); spin_lock_init(&tctx->task_lock);
INIT_WQ_LIST(&tctx->task_list); INIT_WQ_LIST(&tctx->task_list);
...@@ -8719,43 +8723,12 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx, ...@@ -8719,43 +8723,12 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
io_uring_try_cancel_requests(ctx, task, files); io_uring_try_cancel_requests(ctx, task, files);
if (ctx->sq_data)
io_sq_thread_unpark(ctx->sq_data);
prepare_to_wait(&task->io_uring->wait, &wait, prepare_to_wait(&task->io_uring->wait, &wait,
TASK_UNINTERRUPTIBLE); TASK_UNINTERRUPTIBLE);
if (inflight == io_uring_count_inflight(ctx, task, files)) if (inflight == io_uring_count_inflight(ctx, task, files))
schedule(); schedule();
finish_wait(&task->io_uring->wait, &wait); finish_wait(&task->io_uring->wait, &wait);
if (ctx->sq_data)
io_sq_thread_park(ctx->sq_data);
}
}
/*
* We need to iteratively cancel requests, in case a request has dependent
* hard links. These persist even for failure of cancelations, hence keep
* looping until none are found.
*/
static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx,
struct files_struct *files)
{
struct task_struct *task = current;
if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) {
io_sq_thread_park(ctx->sq_data);
task = ctx->sq_data->thread;
if (task)
atomic_inc(&task->io_uring->in_idle);
} }
io_uring_cancel_files(ctx, task, files);
if (!files)
io_uring_try_cancel_requests(ctx, task, NULL);
if (task)
atomic_dec(&task->io_uring->in_idle);
if (ctx->sq_data)
io_sq_thread_unpark(ctx->sq_data);
} }
/* /*
...@@ -8796,15 +8769,6 @@ static int io_uring_add_task_file(struct io_ring_ctx *ctx) ...@@ -8796,15 +8769,6 @@ static int io_uring_add_task_file(struct io_ring_ctx *ctx)
} }
tctx->last = ctx; tctx->last = ctx;
} }
/*
* This is race safe in that the task itself is doing this, hence it
* cannot be going through the exit/cancel paths at the same time.
* This cannot be modified while exit/cancel is running.
*/
if (!tctx->sqpoll && (ctx->flags & IORING_SETUP_SQPOLL))
tctx->sqpoll = true;
return 0; return 0;
} }
...@@ -8847,6 +8811,44 @@ static void io_uring_clean_tctx(struct io_uring_task *tctx) ...@@ -8847,6 +8811,44 @@ static void io_uring_clean_tctx(struct io_uring_task *tctx)
} }
} }
static s64 tctx_inflight(struct io_uring_task *tctx)
{
return percpu_counter_sum(&tctx->inflight);
}
static void io_sqpoll_cancel_cb(struct callback_head *cb)
{
struct io_tctx_exit *work = container_of(cb, struct io_tctx_exit, task_work);
struct io_ring_ctx *ctx = work->ctx;
struct io_sq_data *sqd = ctx->sq_data;
if (sqd->thread)
io_uring_cancel_sqpoll(ctx);
complete(&work->completion);
}
static void io_sqpoll_cancel_sync(struct io_ring_ctx *ctx)
{
struct io_sq_data *sqd = ctx->sq_data;
struct io_tctx_exit work = { .ctx = ctx, };
struct task_struct *task;
io_sq_thread_park(sqd);
list_del_init(&ctx->sqd_list);
io_sqd_update_thread_idle(sqd);
task = sqd->thread;
if (task) {
init_completion(&work.completion);
init_task_work(&work.task_work, io_sqpoll_cancel_cb);
WARN_ON_ONCE(task_work_add(task, &work.task_work, TWA_SIGNAL));
wake_up_process(task);
}
io_sq_thread_unpark(sqd);
if (task)
wait_for_completion(&work.completion);
}
void __io_uring_files_cancel(struct files_struct *files) void __io_uring_files_cancel(struct files_struct *files)
{ {
struct io_uring_task *tctx = current->io_uring; struct io_uring_task *tctx = current->io_uring;
...@@ -8855,41 +8857,40 @@ void __io_uring_files_cancel(struct files_struct *files) ...@@ -8855,41 +8857,40 @@ void __io_uring_files_cancel(struct files_struct *files)
/* make sure overflow events are dropped */ /* make sure overflow events are dropped */
atomic_inc(&tctx->in_idle); atomic_inc(&tctx->in_idle);
xa_for_each(&tctx->xa, index, node) xa_for_each(&tctx->xa, index, node) {
io_uring_cancel_task_requests(node->ctx, files); struct io_ring_ctx *ctx = node->ctx;
if (ctx->sq_data) {
io_sqpoll_cancel_sync(ctx);
continue;
}
io_uring_cancel_files(ctx, current, files);
if (!files)
io_uring_try_cancel_requests(ctx, current, NULL);
}
atomic_dec(&tctx->in_idle); atomic_dec(&tctx->in_idle);
if (files) if (files)
io_uring_clean_tctx(tctx); io_uring_clean_tctx(tctx);
} }
static s64 tctx_inflight(struct io_uring_task *tctx) /* should only be called by SQPOLL task */
{
return percpu_counter_sum(&tctx->inflight);
}
static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx) static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx)
{ {
struct io_sq_data *sqd = ctx->sq_data; struct io_sq_data *sqd = ctx->sq_data;
struct io_uring_task *tctx; struct io_uring_task *tctx = current->io_uring;
s64 inflight; s64 inflight;
DEFINE_WAIT(wait); DEFINE_WAIT(wait);
if (!sqd) WARN_ON_ONCE(!sqd || ctx->sq_data->thread != current);
return;
io_sq_thread_park(sqd);
if (!sqd->thread || !sqd->thread->io_uring) {
io_sq_thread_unpark(sqd);
return;
}
tctx = ctx->sq_data->thread->io_uring;
atomic_inc(&tctx->in_idle); atomic_inc(&tctx->in_idle);
do { do {
/* read completions before cancelations */ /* read completions before cancelations */
inflight = tctx_inflight(tctx); inflight = tctx_inflight(tctx);
if (!inflight) if (!inflight)
break; break;
io_uring_cancel_task_requests(ctx, NULL); io_uring_try_cancel_requests(ctx, current, NULL);
prepare_to_wait(&tctx->wait, &wait, TASK_UNINTERRUPTIBLE); prepare_to_wait(&tctx->wait, &wait, TASK_UNINTERRUPTIBLE);
/* /*
...@@ -8902,7 +8903,6 @@ static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx) ...@@ -8902,7 +8903,6 @@ static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx)
finish_wait(&tctx->wait, &wait); finish_wait(&tctx->wait, &wait);
} while (1); } while (1);
atomic_dec(&tctx->in_idle); atomic_dec(&tctx->in_idle);
io_sq_thread_unpark(sqd);
} }
/* /*
...@@ -8917,15 +8917,6 @@ void __io_uring_task_cancel(void) ...@@ -8917,15 +8917,6 @@ void __io_uring_task_cancel(void)
/* make sure overflow events are dropped */ /* make sure overflow events are dropped */
atomic_inc(&tctx->in_idle); atomic_inc(&tctx->in_idle);
if (tctx->sqpoll) {
struct io_tctx_node *node;
unsigned long index;
xa_for_each(&tctx->xa, index, node)
io_uring_cancel_sqpoll(node->ctx);
}
do { do {
/* read completions before cancelations */ /* read completions before cancelations */
inflight = tctx_inflight(tctx); inflight = tctx_inflight(tctx);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment