Commit ad3eb2c8 authored by Jens Axboe's avatar Jens Axboe

io_uring: split overflow state into SQ and CQ side

We currently check ->cq_overflow_list from both SQ and CQ context, which
causes some bouncing of that cache line. Add separate bits of state for
this instead, so that the SQ side can check using its own state, and
likewise for the CQ side.

This adds ->sq_check_overflow with the SQ state, and ->cq_check_overflow
with the CQ state. If we hit an overflow condition, both of these bits
are set. Likewise for overflow flush clear, we clear both bits. For the
fast path of just checking if there's an overflow condition on either
the SQ or CQ side, we can use our own private bit for this.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent d3656344
...@@ -224,13 +224,14 @@ struct io_ring_ctx { ...@@ -224,13 +224,14 @@ struct io_ring_ctx {
unsigned sq_thread_idle; unsigned sq_thread_idle;
unsigned cached_sq_dropped; unsigned cached_sq_dropped;
atomic_t cached_cq_overflow; atomic_t cached_cq_overflow;
struct io_uring_sqe *sq_sqes; unsigned long sq_check_overflow;
struct list_head defer_list; struct list_head defer_list;
struct list_head timeout_list; struct list_head timeout_list;
struct list_head cq_overflow_list; struct list_head cq_overflow_list;
wait_queue_head_t inflight_wait; wait_queue_head_t inflight_wait;
struct io_uring_sqe *sq_sqes;
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
struct io_rings *rings; struct io_rings *rings;
...@@ -272,6 +273,7 @@ struct io_ring_ctx { ...@@ -272,6 +273,7 @@ struct io_ring_ctx {
unsigned cq_entries; unsigned cq_entries;
unsigned cq_mask; unsigned cq_mask;
atomic_t cq_timeouts; atomic_t cq_timeouts;
unsigned long cq_check_overflow;
struct wait_queue_head cq_wait; struct wait_queue_head cq_wait;
struct fasync_struct *cq_fasync; struct fasync_struct *cq_fasync;
struct eventfd_ctx *cq_ev_fd; struct eventfd_ctx *cq_ev_fd;
...@@ -950,6 +952,10 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) ...@@ -950,6 +952,10 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
} }
io_commit_cqring(ctx); io_commit_cqring(ctx);
if (cqe) {
clear_bit(0, &ctx->sq_check_overflow);
clear_bit(0, &ctx->cq_check_overflow);
}
spin_unlock_irqrestore(&ctx->completion_lock, flags); spin_unlock_irqrestore(&ctx->completion_lock, flags);
io_cqring_ev_posted(ctx); io_cqring_ev_posted(ctx);
...@@ -983,6 +989,10 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res) ...@@ -983,6 +989,10 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
WRITE_ONCE(ctx->rings->cq_overflow, WRITE_ONCE(ctx->rings->cq_overflow,
atomic_inc_return(&ctx->cached_cq_overflow)); atomic_inc_return(&ctx->cached_cq_overflow));
} else { } else {
if (list_empty(&ctx->cq_overflow_list)) {
set_bit(0, &ctx->sq_check_overflow);
set_bit(0, &ctx->cq_check_overflow);
}
refcount_inc(&req->refs); refcount_inc(&req->refs);
req->result = res; req->result = res;
list_add_tail(&req->list, &ctx->cq_overflow_list); list_add_tail(&req->list, &ctx->cq_overflow_list);
...@@ -1285,19 +1295,21 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush) ...@@ -1285,19 +1295,21 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
{ {
struct io_rings *rings = ctx->rings; struct io_rings *rings = ctx->rings;
if (test_bit(0, &ctx->cq_check_overflow)) {
/* /*
* noflush == true is from the waitqueue handler, just ensure we wake * noflush == true is from the waitqueue handler, just ensure
* up the task, and the next invocation will flush the entries. We * we wake up the task, and the next invocation will flush the
* cannot safely to it from here. * entries. We cannot safely to it from here.
*/ */
if (noflush && !list_empty(&ctx->cq_overflow_list)) if (noflush && !list_empty(&ctx->cq_overflow_list))
return -1U; return -1U;
io_cqring_overflow_flush(ctx, false); io_cqring_overflow_flush(ctx, false);
}
/* See comment at the top of this file */ /* See comment at the top of this file */
smp_rmb(); smp_rmb();
return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head); return ctx->cached_cq_tail - READ_ONCE(rings->cq.head);
} }
static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
...@@ -4309,9 +4321,11 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr, ...@@ -4309,9 +4321,11 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
bool mm_fault = false; bool mm_fault = false;
/* if we have a backlog and couldn't flush it all, return BUSY */ /* if we have a backlog and couldn't flush it all, return BUSY */
if (test_bit(0, &ctx->sq_check_overflow)) {
if (!list_empty(&ctx->cq_overflow_list) && if (!list_empty(&ctx->cq_overflow_list) &&
!io_cqring_overflow_flush(ctx, false)) !io_cqring_overflow_flush(ctx, false))
return -EBUSY; return -EBUSY;
}
if (nr > IO_PLUG_THRESHOLD) { if (nr > IO_PLUG_THRESHOLD) {
io_submit_state_start(&state, nr); io_submit_state_start(&state, nr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment