Commit eac406c6 authored by Jens Axboe's avatar Jens Axboe

io_uring: make POLL_ADD/POLL_REMOVE scale better

One of the obvious use cases for these commands is networking, where
it's not uncommon to have tons of sockets open and polled for. The
current implementation uses a list for insertion and lookup, which works
fine for file based use cases where the count is usually low, it breaks
down somewhat for higher number of files / sockets. A test case with
30k sockets being polled for and cancelled takes:

real    0m6.968s
user    0m0.002s
sys     0m6.936s

with the patch it takes:

real    0m0.233s
user    0m0.010s
sys     0m0.176s

If you go to 50k sockets, it gets even more abysmal with the current
code:

real    0m40.602s
user    0m0.010s
sys     0m40.555s

with the patch it takes:

real    0m0.398s
user    0m0.000s
sys     0m0.341s

Change is pretty straight forward, just replace the cancel_list with
a red/black tree instead.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 021d1cdd
...@@ -271,7 +271,7 @@ struct io_ring_ctx { ...@@ -271,7 +271,7 @@ struct io_ring_ctx {
* manipulate the list, hence no extra locking is needed there. * manipulate the list, hence no extra locking is needed there.
*/ */
struct list_head poll_list; struct list_head poll_list;
struct list_head cancel_list; struct rb_root cancel_tree;
spinlock_t inflight_lock; spinlock_t inflight_lock;
struct list_head inflight_list; struct list_head inflight_list;
...@@ -323,7 +323,10 @@ struct io_kiocb { ...@@ -323,7 +323,10 @@ struct io_kiocb {
struct sqe_submit submit; struct sqe_submit submit;
struct io_ring_ctx *ctx; struct io_ring_ctx *ctx;
struct list_head list; union {
struct list_head list;
struct rb_node rb_node;
};
struct list_head link_list; struct list_head link_list;
unsigned int flags; unsigned int flags;
refcount_t refs; refcount_t refs;
...@@ -433,7 +436,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) ...@@ -433,7 +436,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
init_waitqueue_head(&ctx->wait); init_waitqueue_head(&ctx->wait);
spin_lock_init(&ctx->completion_lock); spin_lock_init(&ctx->completion_lock);
INIT_LIST_HEAD(&ctx->poll_list); INIT_LIST_HEAD(&ctx->poll_list);
INIT_LIST_HEAD(&ctx->cancel_list); ctx->cancel_tree = RB_ROOT;
INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->timeout_list);
init_waitqueue_head(&ctx->inflight_wait); init_waitqueue_head(&ctx->inflight_wait);
...@@ -1934,6 +1937,14 @@ static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe, ...@@ -1934,6 +1937,14 @@ static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe,
#endif #endif
} }
static inline void io_poll_remove_req(struct io_kiocb *req)
{
if (!RB_EMPTY_NODE(&req->rb_node)) {
rb_erase(&req->rb_node, &req->ctx->cancel_tree);
RB_CLEAR_NODE(&req->rb_node);
}
}
static void io_poll_remove_one(struct io_kiocb *req) static void io_poll_remove_one(struct io_kiocb *req)
{ {
struct io_poll_iocb *poll = &req->poll; struct io_poll_iocb *poll = &req->poll;
...@@ -1945,17 +1956,17 @@ static void io_poll_remove_one(struct io_kiocb *req) ...@@ -1945,17 +1956,17 @@ static void io_poll_remove_one(struct io_kiocb *req)
io_queue_async_work(req); io_queue_async_work(req);
} }
spin_unlock(&poll->head->lock); spin_unlock(&poll->head->lock);
io_poll_remove_req(req);
list_del_init(&req->list);
} }
static void io_poll_remove_all(struct io_ring_ctx *ctx) static void io_poll_remove_all(struct io_ring_ctx *ctx)
{ {
struct rb_node *node;
struct io_kiocb *req; struct io_kiocb *req;
spin_lock_irq(&ctx->completion_lock); spin_lock_irq(&ctx->completion_lock);
while (!list_empty(&ctx->cancel_list)) { while ((node = rb_first(&ctx->cancel_tree)) != NULL) {
req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list); req = rb_entry(node, struct io_kiocb, rb_node);
io_poll_remove_one(req); io_poll_remove_one(req);
} }
spin_unlock_irq(&ctx->completion_lock); spin_unlock_irq(&ctx->completion_lock);
...@@ -1963,13 +1974,21 @@ static void io_poll_remove_all(struct io_ring_ctx *ctx) ...@@ -1963,13 +1974,21 @@ static void io_poll_remove_all(struct io_ring_ctx *ctx)
static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr) static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
{ {
struct rb_node *p, *parent = NULL;
struct io_kiocb *req; struct io_kiocb *req;
list_for_each_entry(req, &ctx->cancel_list, list) { p = ctx->cancel_tree.rb_node;
if (req->user_data != sqe_addr) while (p) {
continue; parent = p;
io_poll_remove_one(req); req = rb_entry(parent, struct io_kiocb, rb_node);
return 0; if (sqe_addr < req->user_data) {
p = p->rb_left;
} else if (sqe_addr > req->user_data) {
p = p->rb_right;
} else {
io_poll_remove_one(req);
return 0;
}
} }
return -ENOENT; return -ENOENT;
...@@ -2039,7 +2058,7 @@ static void io_poll_complete_work(struct io_wq_work **workptr) ...@@ -2039,7 +2058,7 @@ static void io_poll_complete_work(struct io_wq_work **workptr)
spin_unlock_irq(&ctx->completion_lock); spin_unlock_irq(&ctx->completion_lock);
return; return;
} }
list_del_init(&req->list); io_poll_remove_req(req);
io_poll_complete(req, mask); io_poll_complete(req, mask);
spin_unlock_irq(&ctx->completion_lock); spin_unlock_irq(&ctx->completion_lock);
...@@ -2073,7 +2092,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, ...@@ -2073,7 +2092,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
* for finalizing the request, mark us as having grabbed that already. * for finalizing the request, mark us as having grabbed that already.
*/ */
if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) { if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
list_del(&req->list); io_poll_remove_req(req);
io_poll_complete(req, mask); io_poll_complete(req, mask);
req->flags |= REQ_F_COMP_LOCKED; req->flags |= REQ_F_COMP_LOCKED;
io_put_req(req); io_put_req(req);
...@@ -2108,6 +2127,25 @@ static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, ...@@ -2108,6 +2127,25 @@ static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
add_wait_queue(head, &pt->req->poll.wait); add_wait_queue(head, &pt->req->poll.wait);
} }
static void io_poll_req_insert(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
struct rb_node **p = &ctx->cancel_tree.rb_node;
struct rb_node *parent = NULL;
struct io_kiocb *tmp;
while (*p) {
parent = *p;
tmp = rb_entry(parent, struct io_kiocb, rb_node);
if (req->user_data < tmp->user_data)
p = &(*p)->rb_left;
else
p = &(*p)->rb_right;
}
rb_link_node(&req->rb_node, parent, p);
rb_insert_color(&req->rb_node, &ctx->cancel_tree);
}
static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe, static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe,
struct io_kiocb **nxt) struct io_kiocb **nxt)
{ {
...@@ -2129,6 +2167,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe, ...@@ -2129,6 +2167,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe,
INIT_IO_WORK(&req->work, io_poll_complete_work); INIT_IO_WORK(&req->work, io_poll_complete_work);
events = READ_ONCE(sqe->poll_events); events = READ_ONCE(sqe->poll_events);
poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
RB_CLEAR_NODE(&req->rb_node);
poll->head = NULL; poll->head = NULL;
poll->done = false; poll->done = false;
...@@ -2161,7 +2200,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe, ...@@ -2161,7 +2200,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe,
else if (cancel) else if (cancel)
WRITE_ONCE(poll->canceled, true); WRITE_ONCE(poll->canceled, true);
else if (!poll->done) /* actually waiting for an event */ else if (!poll->done) /* actually waiting for an event */
list_add_tail(&req->list, &ctx->cancel_list); io_poll_req_insert(req);
spin_unlock(&poll->head->lock); spin_unlock(&poll->head->lock);
} }
if (mask) { /* no async, we'd stolen it */ if (mask) { /* no async, we'd stolen it */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment