Commit f237c30a authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io_uring: batch task work locking

Many task_work handlers either grab ->uring_lock, or may benefit from
having it. Move locking logic out of individual handlers to a lazy
approach controlled by tctx_task_work(), so we don't keep doing
tons of mutex lock/unlock.
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/d6a34e147f2507a2f3e2fa1e38a9c541dcad3929.1629286357.git.asml.silence@gmail.comSigned-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 5636c00d
...@@ -775,7 +775,7 @@ struct async_poll { ...@@ -775,7 +775,7 @@ struct async_poll {
struct io_poll_iocb *double_poll; struct io_poll_iocb *double_poll;
}; };
typedef void (*io_req_tw_func_t)(struct io_kiocb *req); typedef void (*io_req_tw_func_t)(struct io_kiocb *req, bool *locked);
struct io_task_work { struct io_task_work {
union { union {
...@@ -1080,6 +1080,14 @@ struct sock *io_uring_get_socket(struct file *file) ...@@ -1080,6 +1080,14 @@ struct sock *io_uring_get_socket(struct file *file)
} }
EXPORT_SYMBOL(io_uring_get_socket); EXPORT_SYMBOL(io_uring_get_socket);
static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
{
if (!*locked) {
mutex_lock(&ctx->uring_lock);
*locked = true;
}
}
#define io_for_each_link(pos, head) \ #define io_for_each_link(pos, head) \
for (pos = (head); pos; pos = pos->link) for (pos = (head); pos; pos = pos->link)
...@@ -1193,16 +1201,19 @@ static void io_fallback_req_func(struct work_struct *work) ...@@ -1193,16 +1201,19 @@ static void io_fallback_req_func(struct work_struct *work)
fallback_work.work); fallback_work.work);
struct llist_node *node = llist_del_all(&ctx->fallback_llist); struct llist_node *node = llist_del_all(&ctx->fallback_llist);
struct io_kiocb *req, *tmp; struct io_kiocb *req, *tmp;
bool locked = false;
percpu_ref_get(&ctx->refs); percpu_ref_get(&ctx->refs);
llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node) llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node)
req->io_task_work.func(req); req->io_task_work.func(req, &locked);
mutex_lock(&ctx->uring_lock); if (locked) {
if (ctx->submit_state.compl_nr) if (ctx->submit_state.compl_nr)
io_submit_flush_completions(ctx); io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
}
percpu_ref_put(&ctx->refs); percpu_ref_put(&ctx->refs);
} }
static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
...@@ -1386,12 +1397,15 @@ static void io_prep_async_link(struct io_kiocb *req) ...@@ -1386,12 +1397,15 @@ static void io_prep_async_link(struct io_kiocb *req)
} }
} }
static void io_queue_async_work(struct io_kiocb *req) static void io_queue_async_work(struct io_kiocb *req, bool *locked)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *link = io_prep_linked_timeout(req); struct io_kiocb *link = io_prep_linked_timeout(req);
struct io_uring_task *tctx = req->task->io_uring; struct io_uring_task *tctx = req->task->io_uring;
/* must not take the lock, NULL it as a precaution */
locked = NULL;
BUG_ON(!tctx); BUG_ON(!tctx);
BUG_ON(!tctx->io_wq); BUG_ON(!tctx->io_wq);
...@@ -2013,21 +2027,22 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req) ...@@ -2013,21 +2027,22 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
return __io_req_find_next(req); return __io_req_find_next(req);
} }
static void ctx_flush_and_put(struct io_ring_ctx *ctx) static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
{ {
if (!ctx) if (!ctx)
return; return;
if (ctx->submit_state.compl_nr) { if (*locked) {
mutex_lock(&ctx->uring_lock);
if (ctx->submit_state.compl_nr) if (ctx->submit_state.compl_nr)
io_submit_flush_completions(ctx); io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
*locked = false;
} }
percpu_ref_put(&ctx->refs); percpu_ref_put(&ctx->refs);
} }
static void tctx_task_work(struct callback_head *cb) static void tctx_task_work(struct callback_head *cb)
{ {
bool locked = false;
struct io_ring_ctx *ctx = NULL; struct io_ring_ctx *ctx = NULL;
struct io_uring_task *tctx = container_of(cb, struct io_uring_task, struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
task_work); task_work);
...@@ -2050,18 +2065,18 @@ static void tctx_task_work(struct callback_head *cb) ...@@ -2050,18 +2065,18 @@ static void tctx_task_work(struct callback_head *cb)
io_task_work.node); io_task_work.node);
if (req->ctx != ctx) { if (req->ctx != ctx) {
ctx_flush_and_put(ctx); ctx_flush_and_put(ctx, &locked);
ctx = req->ctx; ctx = req->ctx;
percpu_ref_get(&ctx->refs); percpu_ref_get(&ctx->refs);
} }
req->io_task_work.func(req); req->io_task_work.func(req, &locked);
node = next; node = next;
} while (node); } while (node);
cond_resched(); cond_resched();
} }
ctx_flush_and_put(ctx); ctx_flush_and_put(ctx, &locked);
} }
static void io_req_task_work_add(struct io_kiocb *req) static void io_req_task_work_add(struct io_kiocb *req)
...@@ -2113,28 +2128,26 @@ static void io_req_task_work_add(struct io_kiocb *req) ...@@ -2113,28 +2128,26 @@ static void io_req_task_work_add(struct io_kiocb *req)
} }
} }
static void io_req_task_cancel(struct io_kiocb *req) static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
/* ctx is guaranteed to stay alive while we hold uring_lock */ /* ctx is guaranteed to stay alive while we hold uring_lock */
mutex_lock(&ctx->uring_lock); io_tw_lock(ctx, locked);
io_req_complete_failed(req, req->result); io_req_complete_failed(req, req->result);
mutex_unlock(&ctx->uring_lock);
} }
static void io_req_task_submit(struct io_kiocb *req) static void io_req_task_submit(struct io_kiocb *req, bool *locked)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
/* ctx stays valid until unlock, even if we drop all ours ctx->refs */ /* ctx stays valid until unlock, even if we drop all ours ctx->refs */
mutex_lock(&ctx->uring_lock); io_tw_lock(ctx, locked);
/* req->task == current here, checking PF_EXITING is safe */ /* req->task == current here, checking PF_EXITING is safe */
if (likely(!(req->task->flags & PF_EXITING))) if (likely(!(req->task->flags & PF_EXITING)))
__io_queue_sqe(req); __io_queue_sqe(req);
else else
io_req_complete_failed(req, -EFAULT); io_req_complete_failed(req, -EFAULT);
mutex_unlock(&ctx->uring_lock);
} }
static void io_req_task_queue_fail(struct io_kiocb *req, int ret) static void io_req_task_queue_fail(struct io_kiocb *req, int ret)
...@@ -2170,6 +2183,11 @@ static void io_free_req(struct io_kiocb *req) ...@@ -2170,6 +2183,11 @@ static void io_free_req(struct io_kiocb *req)
__io_free_req(req); __io_free_req(req);
} }
static void io_free_req_work(struct io_kiocb *req, bool *locked)
{
io_free_req(req);
}
struct req_batch { struct req_batch {
struct task_struct *task; struct task_struct *task;
int task_refs; int task_refs;
...@@ -2267,7 +2285,7 @@ static inline void io_put_req(struct io_kiocb *req) ...@@ -2267,7 +2285,7 @@ static inline void io_put_req(struct io_kiocb *req)
static inline void io_put_req_deferred(struct io_kiocb *req) static inline void io_put_req_deferred(struct io_kiocb *req)
{ {
if (req_ref_put_and_test(req)) { if (req_ref_put_and_test(req)) {
req->io_task_work.func = io_free_req; req->io_task_work.func = io_free_req_work;
io_req_task_work_add(req); io_req_task_work_add(req);
} }
} }
...@@ -2562,7 +2580,7 @@ static bool __io_complete_rw_common(struct io_kiocb *req, long res) ...@@ -2562,7 +2580,7 @@ static bool __io_complete_rw_common(struct io_kiocb *req, long res)
return false; return false;
} }
static void io_req_task_complete(struct io_kiocb *req) static void io_req_task_complete(struct io_kiocb *req, bool *locked)
{ {
__io_req_complete(req, 0, req->result, io_put_rw_kbuf(req)); __io_req_complete(req, 0, req->result, io_put_rw_kbuf(req));
} }
...@@ -2572,7 +2590,7 @@ static void __io_complete_rw(struct io_kiocb *req, long res, long res2, ...@@ -2572,7 +2590,7 @@ static void __io_complete_rw(struct io_kiocb *req, long res, long res2,
{ {
if (__io_complete_rw_common(req, res)) if (__io_complete_rw_common(req, res))
return; return;
io_req_task_complete(req); __io_req_complete(req, 0, req->result, io_put_rw_kbuf(req));
} }
static void io_complete_rw(struct kiocb *kiocb, long res, long res2) static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
...@@ -4994,7 +5012,7 @@ static bool io_poll_complete(struct io_kiocb *req, __poll_t mask) ...@@ -4994,7 +5012,7 @@ static bool io_poll_complete(struct io_kiocb *req, __poll_t mask)
return !(flags & IORING_CQE_F_MORE); return !(flags & IORING_CQE_F_MORE);
} }
static void io_poll_task_func(struct io_kiocb *req) static void io_poll_task_func(struct io_kiocb *req, bool *locked)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *nxt; struct io_kiocb *nxt;
...@@ -5018,7 +5036,7 @@ static void io_poll_task_func(struct io_kiocb *req) ...@@ -5018,7 +5036,7 @@ static void io_poll_task_func(struct io_kiocb *req)
if (done) { if (done) {
nxt = io_put_req_find_next(req); nxt = io_put_req_find_next(req);
if (nxt) if (nxt)
io_req_task_submit(nxt); io_req_task_submit(nxt, locked);
} }
} }
} }
...@@ -5130,7 +5148,7 @@ static void io_async_queue_proc(struct file *file, struct wait_queue_head *head, ...@@ -5130,7 +5148,7 @@ static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll); __io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
} }
static void io_async_task_func(struct io_kiocb *req) static void io_async_task_func(struct io_kiocb *req, bool *locked)
{ {
struct async_poll *apoll = req->apoll; struct async_poll *apoll = req->apoll;
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
...@@ -5147,7 +5165,7 @@ static void io_async_task_func(struct io_kiocb *req) ...@@ -5147,7 +5165,7 @@ static void io_async_task_func(struct io_kiocb *req)
spin_unlock(&ctx->completion_lock); spin_unlock(&ctx->completion_lock);
if (!READ_ONCE(apoll->poll.canceled)) if (!READ_ONCE(apoll->poll.canceled))
io_req_task_submit(req); io_req_task_submit(req, locked);
else else
io_req_complete_failed(req, -ECANCELED); io_req_complete_failed(req, -ECANCELED);
} }
...@@ -5546,7 +5564,7 @@ static int io_poll_update(struct io_kiocb *req, unsigned int issue_flags) ...@@ -5546,7 +5564,7 @@ static int io_poll_update(struct io_kiocb *req, unsigned int issue_flags)
return 0; return 0;
} }
static void io_req_task_timeout(struct io_kiocb *req) static void io_req_task_timeout(struct io_kiocb *req, bool *locked)
{ {
req_set_fail(req); req_set_fail(req);
io_req_complete_post(req, -ETIME, 0); io_req_complete_post(req, -ETIME, 0);
...@@ -6103,7 +6121,7 @@ static bool io_drain_req(struct io_kiocb *req) ...@@ -6103,7 +6121,7 @@ static bool io_drain_req(struct io_kiocb *req)
if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) { if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
spin_unlock(&ctx->completion_lock); spin_unlock(&ctx->completion_lock);
kfree(de); kfree(de);
io_queue_async_work(req); io_queue_async_work(req, NULL);
return true; return true;
} }
...@@ -6425,7 +6443,7 @@ static inline struct file *io_file_get(struct io_ring_ctx *ctx, ...@@ -6425,7 +6443,7 @@ static inline struct file *io_file_get(struct io_ring_ctx *ctx,
return io_file_get_normal(ctx, req, fd); return io_file_get_normal(ctx, req, fd);
} }
static void io_req_task_link_timeout(struct io_kiocb *req) static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked)
{ {
struct io_kiocb *prev = req->timeout.prev; struct io_kiocb *prev = req->timeout.prev;
int ret; int ret;
...@@ -6529,7 +6547,7 @@ static void __io_queue_sqe(struct io_kiocb *req) ...@@ -6529,7 +6547,7 @@ static void __io_queue_sqe(struct io_kiocb *req)
* Queued up for async execution, worker will release * Queued up for async execution, worker will release
* submit reference when the iocb is actually submitted. * submit reference when the iocb is actually submitted.
*/ */
io_queue_async_work(req); io_queue_async_work(req, NULL);
break; break;
} }
...@@ -6554,7 +6572,7 @@ static inline void io_queue_sqe(struct io_kiocb *req) ...@@ -6554,7 +6572,7 @@ static inline void io_queue_sqe(struct io_kiocb *req)
if (unlikely(ret)) if (unlikely(ret))
io_req_complete_failed(req, ret); io_req_complete_failed(req, ret);
else else
io_queue_async_work(req); io_queue_async_work(req, NULL);
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment