Commit 94ae5e77 authored by Jens Axboe's avatar Jens Axboe

io_uring: fix sequencing issues with linked timeouts

We have an issue with timeout links that are deeper in the submit chain,
because we only handle it upfront, not from later submissions. Move the
prep + issue of the timeout link to the async work prep handler, and do
it normally for non-async queue. If we validate and prepare the timeout
links upfront when we first see them, there's nothing stopping us from
supporting any sort of nesting.

Fixes: 2665abfd ("io_uring: add support for linked SQE timeouts")
Reported-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent ad8a48ac
......@@ -353,6 +353,7 @@ struct io_kiocb {
#define REQ_F_TIMEOUT_NOSEQ 8192 /* no timeout sequence */
#define REQ_F_INFLIGHT 16384 /* on inflight list */
#define REQ_F_COMP_LOCKED 32768 /* completion under lock */
#define REQ_F_FREE_SQE 65536 /* free sqe if not async queued */
u64 user_data;
u32 result;
u32 sequence;
......@@ -391,6 +392,8 @@ static void __io_free_req(struct io_kiocb *req);
static void io_put_req(struct io_kiocb *req);
static void io_double_put_req(struct io_kiocb *req);
static void __io_double_put_req(struct io_kiocb *req);
static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
static void io_queue_linked_timeout(struct io_kiocb *req);
static struct kmem_cache *req_cachep;
......@@ -529,7 +532,8 @@ static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
opcode == IORING_OP_WRITE_FIXED);
}
static inline bool io_prep_async_work(struct io_kiocb *req)
static inline bool io_prep_async_work(struct io_kiocb *req,
struct io_kiocb **link)
{
bool do_hashed = false;
......@@ -558,13 +562,17 @@ static inline bool io_prep_async_work(struct io_kiocb *req)
req->work.flags |= IO_WQ_WORK_NEEDS_USER;
}
*link = io_prep_linked_timeout(req);
return do_hashed;
}
static inline void io_queue_async_work(struct io_kiocb *req)
{
bool do_hashed = io_prep_async_work(req);
struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *link;
bool do_hashed;
do_hashed = io_prep_async_work(req, &link);
trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
req->flags);
......@@ -574,6 +582,9 @@ static inline void io_queue_async_work(struct io_kiocb *req)
io_wq_enqueue_hashed(ctx->io_wq, &req->work,
file_inode(req->file));
}
if (link)
io_queue_linked_timeout(link);
}
static void io_kill_timeout(struct io_kiocb *req)
......@@ -875,6 +886,15 @@ static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
while (nxt) {
list_del_init(&nxt->list);
if ((req->flags & REQ_F_LINK_TIMEOUT) &&
(nxt->flags & REQ_F_TIMEOUT)) {
wake_ev |= io_link_cancel_timeout(nxt);
nxt = list_first_entry_or_null(&req->link_list,
struct io_kiocb, list);
req->flags &= ~REQ_F_LINK_TIMEOUT;
continue;
}
if (!list_empty(&req->link_list)) {
INIT_LIST_HEAD(&nxt->link_list);
list_splice(&req->link_list, &nxt->link_list);
......@@ -885,19 +905,13 @@ static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
* If we're in async work, we can continue processing the chain
* in this context instead of having to queue up new async work.
*/
if (req->flags & REQ_F_LINK_TIMEOUT) {
wake_ev = io_link_cancel_timeout(nxt);
/* we dropped this link, get next */
nxt = list_first_entry_or_null(&req->link_list,
struct io_kiocb, list);
} else if (nxtptr && io_wq_current_is_worker()) {
if (nxt) {
if (nxtptr && io_wq_current_is_worker())
*nxtptr = nxt;
break;
} else {
else
io_queue_async_work(nxt);
break;
}
break;
}
if (wake_ev)
......@@ -916,11 +930,16 @@ static void io_fail_links(struct io_kiocb *req)
spin_lock_irqsave(&ctx->completion_lock, flags);
while (!list_empty(&req->link_list)) {
const struct io_uring_sqe *sqe_to_free = NULL;
link = list_first_entry(&req->link_list, struct io_kiocb, list);
list_del_init(&link->list);
trace_io_uring_fail_link(req, link);
if (link->flags & REQ_F_FREE_SQE)
sqe_to_free = link->submit.sqe;
if ((req->flags & REQ_F_LINK_TIMEOUT) &&
link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) {
io_link_cancel_timeout(link);
......@@ -928,6 +947,7 @@ static void io_fail_links(struct io_kiocb *req)
io_cqring_fill_event(link, -ECANCELED);
__io_double_put_req(link);
}
kfree(sqe_to_free);
}
io_commit_cqring(ctx);
......@@ -2683,8 +2703,12 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
/* if a dependent link is ready, pass it back */
if (!ret && nxt) {
io_prep_async_work(nxt);
struct io_kiocb *link;
io_prep_async_work(nxt, &link);
*workptr = &nxt->work;
if (link)
io_queue_linked_timeout(link);
}
}
......@@ -2819,7 +2843,6 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
static void io_queue_linked_timeout(struct io_kiocb *req)
{
struct io_timeout_data *data = req->timeout.data;
struct io_ring_ctx *ctx = req->ctx;
/*
......@@ -2828,6 +2851,8 @@ static void io_queue_linked_timeout(struct io_kiocb *req)
*/
spin_lock_irq(&ctx->completion_lock);
if (!list_empty(&req->list)) {
struct io_timeout_data *data = req->timeout.data;
data->timer.function = io_link_timeout_fn;
hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
data->mode);
......@@ -2841,7 +2866,6 @@ static void io_queue_linked_timeout(struct io_kiocb *req)
static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
{
struct io_kiocb *nxt;
int ret;
if (!(req->flags & REQ_F_LINK))
return NULL;
......@@ -2850,33 +2874,15 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
if (!nxt || nxt->submit.sqe->opcode != IORING_OP_LINK_TIMEOUT)
return NULL;
ret = io_timeout_setup(nxt);
/* common setup allows offset being set, we don't */
if (!ret && nxt->submit.sqe->off)
ret = -EINVAL;
if (ret) {
list_del_init(&nxt->list);
io_cqring_add_event(nxt, ret);
io_double_put_req(nxt);
return ERR_PTR(-ECANCELED);
}
req->flags |= REQ_F_LINK_TIMEOUT;
return nxt;
}
static void __io_queue_sqe(struct io_kiocb *req)
{
struct io_kiocb *nxt;
struct io_kiocb *nxt = io_prep_linked_timeout(req);
int ret;
nxt = io_prep_linked_timeout(req);
if (IS_ERR(nxt)) {
ret = PTR_ERR(nxt);
nxt = NULL;
goto err;
}
ret = __io_submit_sqe(req, NULL, true);
/*
......@@ -2904,10 +2910,6 @@ static void __io_queue_sqe(struct io_kiocb *req)
* submit reference when the iocb is actually submitted.
*/
io_queue_async_work(req);
if (nxt)
io_queue_linked_timeout(nxt);
return;
}
}
......@@ -2952,6 +2954,10 @@ static void io_queue_link_head(struct io_kiocb *req, struct io_kiocb *shadow)
int need_submit = false;
struct io_ring_ctx *ctx = req->ctx;
if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
ret = -ECANCELED;
goto err;
}
if (!shadow) {
io_queue_sqe(req);
return;
......@@ -2966,8 +2972,10 @@ static void io_queue_link_head(struct io_kiocb *req, struct io_kiocb *shadow)
ret = io_req_defer(req);
if (ret) {
if (ret != -EIOCBQUEUED) {
err:
io_cqring_add_event(req, ret);
io_double_put_req(req);
if (shadow)
__io_free_req(shadow);
return;
}
......@@ -3025,6 +3033,17 @@ static void io_submit_sqe(struct io_kiocb *req, struct io_submit_state *state,
if (*link) {
struct io_kiocb *prev = *link;
if (READ_ONCE(s->sqe->opcode) == IORING_OP_LINK_TIMEOUT) {
ret = io_timeout_setup(req);
/* common setup allows offset being set, we don't */
if (!ret && s->sqe->off)
ret = -EINVAL;
if (ret) {
prev->flags |= REQ_F_FAIL_LINK;
goto err_req;
}
}
sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
if (!sqe_copy) {
ret = -EAGAIN;
......@@ -3032,6 +3051,7 @@ static void io_submit_sqe(struct io_kiocb *req, struct io_submit_state *state,
}
s->sqe = sqe_copy;
req->flags |= REQ_F_FREE_SQE;
trace_io_uring_link(ctx, req, prev);
list_add_tail(&req->list, &prev->link_list);
} else if (s->sqe->flags & IOSQE_IO_LINK) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment