Commit a85381d8 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io_uring: skip overflow CQE posting for dying ring

After io_ring_ctx_wait_and_kill() is called there should be no users
poking into rings and so there is no need to post CQEs. So, instead of
trying to post overflowed CQEs into the CQ, drop them. Also, do it
in io_ring_exit_work() in a loop to reduce the number of contexts it
can be executed from and even when it struggles to quiesce the ring we
won't be leaving memory allocated for longer than needed.
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/26d13751155a735a3029e24f8d9ca992f810419d.1670384893.git.asml.silence@gmail.comSigned-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 4c979eae
...@@ -600,12 +600,30 @@ void io_cq_unlock_post(struct io_ring_ctx *ctx) ...@@ -600,12 +600,30 @@ void io_cq_unlock_post(struct io_ring_ctx *ctx)
} }
/* Returns true if there are no backlogged entries after the flush */ /* Returns true if there are no backlogged entries after the flush */
static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
{
struct io_overflow_cqe *ocqe;
LIST_HEAD(list);
io_cq_lock(ctx);
list_splice_init(&ctx->cq_overflow_list, &list);
clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
io_cq_unlock(ctx);
while (!list_empty(&list)) {
ocqe = list_first_entry(&list, struct io_overflow_cqe, list);
list_del(&ocqe->list);
kfree(ocqe);
}
}
/* Returns true if there are no backlogged entries after the flush */
static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx)
{ {
bool all_flushed; bool all_flushed;
size_t cqe_size = sizeof(struct io_uring_cqe); size_t cqe_size = sizeof(struct io_uring_cqe);
if (!force && __io_cqring_events(ctx) == ctx->cq_entries) if (__io_cqring_events(ctx) == ctx->cq_entries)
return false; return false;
if (ctx->flags & IORING_SETUP_CQE32) if (ctx->flags & IORING_SETUP_CQE32)
...@@ -616,15 +634,11 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) ...@@ -616,15 +634,11 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true); struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true);
struct io_overflow_cqe *ocqe; struct io_overflow_cqe *ocqe;
if (!cqe && !force) if (!cqe)
break; break;
ocqe = list_first_entry(&ctx->cq_overflow_list, ocqe = list_first_entry(&ctx->cq_overflow_list,
struct io_overflow_cqe, list); struct io_overflow_cqe, list);
if (cqe) memcpy(cqe, &ocqe->cqe, cqe_size);
memcpy(cqe, &ocqe->cqe, cqe_size);
else
io_account_cq_overflow(ctx);
list_del(&ocqe->list); list_del(&ocqe->list);
kfree(ocqe); kfree(ocqe);
} }
...@@ -647,7 +661,7 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx) ...@@ -647,7 +661,7 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
/* iopoll syncs against uring_lock, not completion_lock */ /* iopoll syncs against uring_lock, not completion_lock */
if (ctx->flags & IORING_SETUP_IOPOLL) if (ctx->flags & IORING_SETUP_IOPOLL)
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
ret = __io_cqring_overflow_flush(ctx, false); ret = __io_cqring_overflow_flush(ctx);
if (ctx->flags & IORING_SETUP_IOPOLL) if (ctx->flags & IORING_SETUP_IOPOLL)
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
} }
...@@ -1467,7 +1481,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) ...@@ -1467,7 +1481,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
check_cq = READ_ONCE(ctx->check_cq); check_cq = READ_ONCE(ctx->check_cq);
if (unlikely(check_cq)) { if (unlikely(check_cq)) {
if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
__io_cqring_overflow_flush(ctx, false); __io_cqring_overflow_flush(ctx);
/* /*
* Similarly do not spin if we have not informed the user of any * Similarly do not spin if we have not informed the user of any
* dropped CQE. * dropped CQE.
...@@ -2635,8 +2649,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) ...@@ -2635,8 +2649,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
__io_sqe_buffers_unregister(ctx); __io_sqe_buffers_unregister(ctx);
if (ctx->file_data) if (ctx->file_data)
__io_sqe_files_unregister(ctx); __io_sqe_files_unregister(ctx);
if (ctx->rings) io_cqring_overflow_kill(ctx);
__io_cqring_overflow_flush(ctx, true);
io_eventfd_unregister(ctx); io_eventfd_unregister(ctx);
io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free); io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free); io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
...@@ -2777,6 +2790,12 @@ static __cold void io_ring_exit_work(struct work_struct *work) ...@@ -2777,6 +2790,12 @@ static __cold void io_ring_exit_work(struct work_struct *work)
* as nobody else will be looking for them. * as nobody else will be looking for them.
*/ */
do { do {
if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
mutex_lock(&ctx->uring_lock);
io_cqring_overflow_kill(ctx);
mutex_unlock(&ctx->uring_lock);
}
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
io_move_task_work_from_local(ctx); io_move_task_work_from_local(ctx);
...@@ -2842,8 +2861,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) ...@@ -2842,8 +2861,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
percpu_ref_kill(&ctx->refs); percpu_ref_kill(&ctx->refs);
if (ctx->rings)
__io_cqring_overflow_flush(ctx, true);
xa_for_each(&ctx->personalities, index, creds) xa_for_each(&ctx->personalities, index, creds)
io_unregister_personality(ctx, index); io_unregister_personality(ctx, index);
if (ctx->rings) if (ctx->rings)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment