Commit e94f141b authored by Jens Axboe's avatar Jens Axboe

io_uring: improve poll completion performance

For busy IORING_OP_POLL_ADD workloads, we can have enough contention
on the completion lock that we fail the inline completion path quite
often as we fail the trylock on that lock. Add a list for deferred
completions that we can use in that case. This helps reduce the number
of async offloads we have to do, as if we get multiple completions in
a row, we'll piggy back on to the poll_llist instead of having to queue
our own offload.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent ad3eb2c8
......@@ -286,7 +286,8 @@ struct io_ring_ctx {
struct {
spinlock_t completion_lock;
bool poll_multi_file;
struct llist_head poll_llist;
/*
* ->poll_list is protected by the ctx->uring_lock for
* io_uring instances that don't use IORING_SETUP_SQPOLL.
......@@ -296,6 +297,7 @@ struct io_ring_ctx {
struct list_head poll_list;
struct hlist_head *cancel_hash;
unsigned cancel_hash_bits;
bool poll_multi_file;
spinlock_t inflight_lock;
struct list_head inflight_list;
......@@ -453,7 +455,14 @@ struct io_kiocb {
};
struct io_async_ctx *io;
struct file *ring_file;
union {
/*
* ring_file is only used in the submission path, and
* llist_node is only used for poll deferred completions
*/
struct file *ring_file;
struct llist_node llist_node;
};
int ring_fd;
bool has_user;
bool in_async;
......@@ -725,6 +734,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->wait);
spin_lock_init(&ctx->completion_lock);
init_llist_head(&ctx->poll_llist);
INIT_LIST_HEAD(&ctx->poll_list);
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
......@@ -1320,6 +1330,20 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
}
static inline bool io_req_multi_free(struct io_kiocb *req)
{
/*
* If we're not using fixed files, we have to pair the completion part
* with the file put. Use regular completions for those, only batch
* free for fixed file and non-linked commands.
*/
if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == REQ_F_FIXED_FILE)
&& !io_is_fallback_req(req) && !req->io)
return true;
return false;
}
/*
* Find and free completed poll iocbs
*/
......@@ -1339,14 +1363,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
(*nr_events)++;
if (refcount_dec_and_test(&req->refs)) {
/* If we're not using fixed files, we have to pair the
* completion part with the file put. Use regular
* completions for those, only batch free for fixed
* file and non-linked commands.
*/
if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
REQ_F_FIXED_FILE) && !io_is_fallback_req(req) &&
!req->io) {
if (io_req_multi_free(req)) {
reqs[to_free++] = req;
if (to_free == ARRAY_SIZE(reqs))
io_free_req_many(ctx, reqs, &to_free);
......@@ -3081,6 +3098,44 @@ static void io_poll_complete_work(struct io_wq_work **workptr)
io_wq_assign_next(workptr, nxt);
}
static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes)
{
void *reqs[IO_IOPOLL_BATCH];
struct io_kiocb *req, *tmp;
int to_free = 0;
spin_lock_irq(&ctx->completion_lock);
llist_for_each_entry_safe(req, tmp, nodes, llist_node) {
hash_del(&req->hash_node);
io_poll_complete(req, req->result, 0);
if (refcount_dec_and_test(&req->refs)) {
if (io_req_multi_free(req)) {
reqs[to_free++] = req;
if (to_free == ARRAY_SIZE(reqs))
io_free_req_many(ctx, reqs, &to_free);
} else {
req->flags |= REQ_F_COMP_LOCKED;
io_free_req(req);
}
}
}
spin_unlock_irq(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
io_free_req_many(ctx, reqs, &to_free);
}
static void io_poll_flush(struct io_wq_work **workptr)
{
struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
struct llist_node *nodes;
nodes = llist_del_all(&req->ctx->poll_llist);
if (nodes)
__io_poll_flush(req->ctx, nodes);
}
static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
void *key)
{
......@@ -3088,7 +3143,6 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
struct io_ring_ctx *ctx = req->ctx;
__poll_t mask = key_to_poll(key);
unsigned long flags;
/* for instances that support it check for an event match first: */
if (mask && !(mask & poll->events))
......@@ -3102,17 +3156,31 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
* If we have a link timeout we're going to need the completion_lock
* for finalizing the request, mark us as having grabbed that already.
*/
if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
hash_del(&req->hash_node);
io_poll_complete(req, mask, 0);
req->flags |= REQ_F_COMP_LOCKED;
io_put_req(req);
spin_unlock_irqrestore(&ctx->completion_lock, flags);
if (mask) {
unsigned long flags;
io_cqring_ev_posted(ctx);
} else {
io_queue_async_work(req);
if (llist_empty(&ctx->poll_llist) &&
spin_trylock_irqsave(&ctx->completion_lock, flags)) {
hash_del(&req->hash_node);
io_poll_complete(req, mask, 0);
req->flags |= REQ_F_COMP_LOCKED;
io_put_req(req);
spin_unlock_irqrestore(&ctx->completion_lock, flags);
io_cqring_ev_posted(ctx);
req = NULL;
} else {
req->result = mask;
req->llist_node.next = NULL;
/* if the list wasn't empty, we're done */
if (!llist_add(&req->llist_node, &ctx->poll_llist))
req = NULL;
else
req->work.func = io_poll_flush;
}
}
if (req)
io_queue_async_work(req);
return 1;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment