Commit 4ea15b56 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io_uring/rsrc: use wq for quiescing

Replace completions with waitqueues for rsrc data quiesce, the main
wakeup condition is when data refs hit zero. Note that data refs are
only changes under ->uring_lock, so we prepare before mutex_unlock()
reacquire it after taking the lock back. This change will be needed
in the next patch.
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/1d0dbc74b3b4fd67c8f01819e680c5e0da252956.1681395792.git.asml.silence@gmail.comSigned-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent eef81fca
...@@ -333,6 +333,7 @@ struct io_ring_ctx { ...@@ -333,6 +333,7 @@ struct io_ring_ctx {
/* protected by ->uring_lock */ /* protected by ->uring_lock */
struct list_head rsrc_ref_list; struct list_head rsrc_ref_list;
struct io_alloc_cache rsrc_node_cache; struct io_alloc_cache rsrc_node_cache;
struct wait_queue_head rsrc_quiesce_wq;
struct list_head io_buffers_pages; struct list_head io_buffers_pages;
......
...@@ -321,6 +321,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) ...@@ -321,6 +321,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
mutex_init(&ctx->uring_lock); mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->cq_wait); init_waitqueue_head(&ctx->cq_wait);
init_waitqueue_head(&ctx->poll_wq); init_waitqueue_head(&ctx->poll_wq);
init_waitqueue_head(&ctx->rsrc_quiesce_wq);
spin_lock_init(&ctx->completion_lock); spin_lock_init(&ctx->completion_lock);
spin_lock_init(&ctx->timeout_lock); spin_lock_init(&ctx->timeout_lock);
INIT_WQ_LIST(&ctx->iopoll_list); INIT_WQ_LIST(&ctx->iopoll_list);
......
...@@ -158,6 +158,7 @@ static void io_rsrc_put_work_one(struct io_rsrc_data *rsrc_data, ...@@ -158,6 +158,7 @@ static void io_rsrc_put_work_one(struct io_rsrc_data *rsrc_data,
static void __io_rsrc_put_work(struct io_rsrc_node *ref_node) static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
{ {
struct io_rsrc_data *rsrc_data = ref_node->rsrc_data; struct io_rsrc_data *rsrc_data = ref_node->rsrc_data;
struct io_ring_ctx *ctx = rsrc_data->ctx;
struct io_rsrc_put *prsrc, *tmp; struct io_rsrc_put *prsrc, *tmp;
if (ref_node->inline_items) if (ref_node->inline_items)
...@@ -171,13 +172,13 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node) ...@@ -171,13 +172,13 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
io_rsrc_node_destroy(rsrc_data->ctx, ref_node); io_rsrc_node_destroy(rsrc_data->ctx, ref_node);
if (io_put_rsrc_data_ref(rsrc_data)) if (io_put_rsrc_data_ref(rsrc_data))
complete(&rsrc_data->done); wake_up_all(&ctx->rsrc_quiesce_wq);
} }
void io_wait_rsrc_data(struct io_rsrc_data *data) void io_wait_rsrc_data(struct io_rsrc_data *data)
{ {
if (data && !io_put_rsrc_data_ref(data)) if (data)
wait_for_completion(&data->done); WARN_ON_ONCE(!io_put_rsrc_data_ref(data));
} }
void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *node) void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *node)
...@@ -257,6 +258,7 @@ int io_rsrc_node_switch_start(struct io_ring_ctx *ctx) ...@@ -257,6 +258,7 @@ int io_rsrc_node_switch_start(struct io_ring_ctx *ctx)
__cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
struct io_ring_ctx *ctx) struct io_ring_ctx *ctx)
{ {
DEFINE_WAIT(we);
int ret; int ret;
/* As we may drop ->uring_lock, other task may have started quiesce */ /* As we may drop ->uring_lock, other task may have started quiesce */
...@@ -273,7 +275,9 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, ...@@ -273,7 +275,9 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
data->quiesce = true; data->quiesce = true;
do { do {
prepare_to_wait(&ctx->rsrc_quiesce_wq, &we, TASK_INTERRUPTIBLE);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
ret = io_run_task_work_sig(ctx); ret = io_run_task_work_sig(ctx);
if (ret < 0) { if (ret < 0) {
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
...@@ -285,12 +289,15 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, ...@@ -285,12 +289,15 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
} }
break; break;
} }
wait_for_completion_interruptible(&data->done);
schedule();
__set_current_state(TASK_RUNNING);
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
ret = 0; ret = 0;
} while (data->refs); } while (data->refs);
data->quiesce = false;
finish_wait(&ctx->rsrc_quiesce_wq, &we);
data->quiesce = false;
return ret; return ret;
} }
...@@ -366,7 +373,6 @@ __cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx, ...@@ -366,7 +373,6 @@ __cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx,
goto fail; goto fail;
} }
} }
init_completion(&data->done);
*pdata = data; *pdata = data;
return 0; return 0;
fail: fail:
......
...@@ -35,7 +35,6 @@ struct io_rsrc_data { ...@@ -35,7 +35,6 @@ struct io_rsrc_data {
u64 **tags; u64 **tags;
unsigned int nr; unsigned int nr;
rsrc_put_fn *do_put; rsrc_put_fn *do_put;
struct completion done;
int refs; int refs;
bool quiesce; bool quiesce;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment