io_uring.c 101 KB
Newer Older
Jens Axboe's avatar
Jens Axboe committed
1 2 3 4 5 6
// SPDX-License-Identifier: GPL-2.0
/*
 * Shared application/kernel submission and completion ring pairs, for
 * supporting fast/efficient IO.
 *
 * A note on the read/write ordering memory barriers that are matched between
7 8 9 10 11 12 13
 * the application and kernel side.
 *
 * After the application reads the CQ ring tail, it must use an
 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
 * before writing the tail (using smp_load_acquire to read the tail will
 * do). It also needs a smp_mb() before updating CQ head (ordering the
 * entry load(s) with the head store), pairing with an implicit barrier
14
 * through a control-dependency in io_get_cqe (smp_store_release to
15 16 17 18 19 20 21 22 23 24 25 26 27 28
 * store head will do). Failure to do so could lead to reading invalid
 * CQ entries.
 *
 * Likewise, the application must use an appropriate smp_wmb() before
 * writing the SQ tail (ordering SQ entry stores with the tail store),
 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
 * to store the tail will do). And it needs a barrier ordering the SQ
 * head load before writing new SQ entries (smp_load_acquire to read
 * head will do).
 *
 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
 * updating the SQ tail; a full memory barrier smp_mb() is needed
 * between.
Jens Axboe's avatar
Jens Axboe committed
29 30 31 32 33 34 35 36 37 38 39
 *
 * Also see the examples in the liburing library:
 *
 *	git://git.kernel.dk/liburing
 *
 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
 * from data shared between the kernel and application. This is done both
 * for ordering purposes, but also to ensure that once a value is loaded from
 * data that the application could potentially modify, it remains stable.
 *
 * Copyright (C) 2018-2019 Jens Axboe
40
 * Copyright (c) 2018-2019 Christoph Hellwig
Jens Axboe's avatar
Jens Axboe committed
41 42 43 44 45
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/syscalls.h>
46
#include <net/compat.h>
Jens Axboe's avatar
Jens Axboe committed
47 48
#include <linux/refcount.h>
#include <linux/uio.h>
49
#include <linux/bits.h>
Jens Axboe's avatar
Jens Axboe committed
50 51 52 53 54 55 56 57 58

#include <linux/sched/signal.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/fdtable.h>
#include <linux/mm.h>
#include <linux/mman.h>
#include <linux/percpu.h>
#include <linux/slab.h>
59
#include <linux/bvec.h>
Jens Axboe's avatar
Jens Axboe committed
60 61 62 63 64 65
#include <linux/net.h>
#include <net/sock.h>
#include <linux/anon_inodes.h>
#include <linux/sched/mm.h>
#include <linux/uaccess.h>
#include <linux/nospec.h>
66
#include <linux/fsnotify.h>
67
#include <linux/fadvise.h>
68
#include <linux/task_work.h>
69
#include <linux/io_uring.h>
70
#include <linux/io_uring/cmd.h>
71
#include <linux/audit.h>
72
#include <linux/security.h>
73
#include <asm/shmparam.h>
Jens Axboe's avatar
Jens Axboe committed
74

75 76 77
#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>

Jens Axboe's avatar
Jens Axboe committed
78 79
#include <uapi/linux/io_uring.h>

80
#include "io-wq.h"
Jens Axboe's avatar
Jens Axboe committed
81

82
#include "io_uring.h"
83
#include "opdef.h"
84
#include "refs.h"
85
#include "tctx.h"
86
#include "register.h"
87
#include "sqpoll.h"
88
#include "fdinfo.h"
89
#include "kbuf.h"
90
#include "rsrc.h"
91
#include "cancel.h"
Jens Axboe's avatar
Jens Axboe committed
92
#include "net.h"
93
#include "notif.h"
94
#include "waitid.h"
95
#include "futex.h"
96
#include "napi.h"
97
#include "uring_cmd.h"
98
#include "memmap.h"
99

100
#include "timeout.h"
101
#include "poll.h"
102
#include "rw.h"
103
#include "alloc_cache.h"
104

105
#define IORING_MAX_ENTRIES	32768
106
#define IORING_MAX_CQ_ENTRIES	(2 * IORING_MAX_ENTRIES)
107

108 109 110
#define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
			  IOSQE_IO_HARDLINK | IOSQE_ASYNC)

111 112
#define SQE_VALID_FLAGS	(SQE_COMMON_FLAGS | IOSQE_BUFFER_SELECT | \
			IOSQE_IO_DRAIN | IOSQE_CQE_SKIP_SUCCESS)
113

114
#define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
115 116
				REQ_F_POLLED | REQ_F_INFLIGHT | REQ_F_CREDS | \
				REQ_F_ASYNC_DATA)
117

118 119 120
#define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | REQ_F_LINK | REQ_F_HARDLINK |\
				 IO_REQ_CLEAN_FLAGS)

121 122
#define IO_TCTX_REFS_CACHE_NR	(1U << 10)

123
#define IO_COMPL_BATCH			32
124
#define IO_REQ_ALLOC_BATCH		8
125

126 127 128
struct io_defer_entry {
	struct list_head	list;
	struct io_kiocb		*req;
129
	u32			seq;
Jens Axboe's avatar
Jens Axboe committed
130 131
};

132 133
/* requests with any of those set should undergo io_disarm_next() */
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
134
#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
135

136 137 138 139 140 141 142 143
/*
 * No waiters. It's larger than any valid value of the tw counter
 * so that tests against ->cq_wait_nr would fail and skip wake_up().
 */
#define IO_CQ_WAKE_INIT		(-1U)
/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
#define IO_CQ_WAKE_FORCE	(IO_CQ_WAKE_INIT >> 1)

144
static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
145
					 struct task_struct *task,
146
					 bool cancel_all);
147

148
static void io_queue_sqe(struct io_kiocb *req);
149

150
struct kmem_cache *req_cachep;
151
static struct workqueue_struct *iou_wq __ro_after_init;
Jens Axboe's avatar
Jens Axboe committed
152

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
static int __read_mostly sysctl_io_uring_disabled;
static int __read_mostly sysctl_io_uring_group = -1;

#ifdef CONFIG_SYSCTL
static struct ctl_table kernel_io_uring_disabled_table[] = {
	{
		.procname	= "io_uring_disabled",
		.data		= &sysctl_io_uring_disabled,
		.maxlen		= sizeof(sysctl_io_uring_disabled),
		.mode		= 0644,
		.proc_handler	= proc_dointvec_minmax,
		.extra1		= SYSCTL_ZERO,
		.extra2		= SYSCTL_TWO,
	},
	{
		.procname	= "io_uring_group",
		.data		= &sysctl_io_uring_group,
		.maxlen		= sizeof(gid_t),
		.mode		= 0644,
		.proc_handler	= proc_dointvec,
	},
};
#endif

177 178 179 180 181
static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
{
	return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
}

182 183 184 185 186
static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
{
	return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
}

187 188 189 190 191 192 193 194 195
static bool io_match_linked(struct io_kiocb *head)
{
	struct io_kiocb *req;

	io_for_each_link(req, head) {
		if (req->flags & REQ_F_INFLIGHT)
			return true;
	}
	return false;
196 197 198 199 200 201
}

/*
 * As io_match_task() but protected against racing with linked timeouts.
 * User must not hold timeout_lock.
 */
202 203
bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
			bool cancel_all)
204
{
205 206
	bool matched;

207 208
	if (task && head->task != task)
		return false;
209 210 211 212 213 214 215 216 217 218 219 220 221 222
	if (cancel_all)
		return true;

	if (head->flags & REQ_F_LINK_TIMEOUT) {
		struct io_ring_ctx *ctx = head->ctx;

		/* protect against races with linked timeouts */
		spin_lock_irq(&ctx->timeout_lock);
		matched = io_match_linked(head);
		spin_unlock_irq(&ctx->timeout_lock);
	} else {
		matched = io_match_linked(head);
	}
	return matched;
223 224
}

225 226 227
static inline void req_fail_link_node(struct io_kiocb *req, int res)
{
	req_set_fail(req);
228
	io_req_set_res(req, res, 0);
229 230
}

231 232 233
static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
{
	wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
234 235
}

236
static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
Jens Axboe's avatar
Jens Axboe committed
237 238 239
{
	struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);

240
	complete(&ctx->ref_comp);
Jens Axboe's avatar
Jens Axboe committed
241 242
}

243
static __cold void io_fallback_req_func(struct work_struct *work)
244 245 246 247 248
{
	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
						fallback_work.work);
	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
	struct io_kiocb *req, *tmp;
249
	struct io_tw_state ts = {};
250

251
	percpu_ref_get(&ctx->refs);
252
	mutex_lock(&ctx->uring_lock);
253
	llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
254
		req->io_task_work.func(req, &ts);
255 256
	io_submit_flush_completions(ctx);
	mutex_unlock(&ctx->uring_lock);
257
	percpu_ref_put(&ctx->refs);
258 259
}

260 261 262 263 264 265 266 267 268 269 270 271 272 273
static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
{
	unsigned hash_buckets = 1U << bits;
	size_t hash_size = hash_buckets * sizeof(table->hbs[0]);

	table->hbs = kmalloc(hash_size, GFP_KERNEL);
	if (!table->hbs)
		return -ENOMEM;

	table->hash_bits = bits;
	init_hash_table(table, hash_buckets);
	return 0;
}

274
static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
Jens Axboe's avatar
Jens Axboe committed
275 276
{
	struct io_ring_ctx *ctx;
277
	int hash_bits;
278
	bool ret;
Jens Axboe's avatar
Jens Axboe committed
279 280 281 282 283

	ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
	if (!ctx)
		return NULL;

284 285
	xa_init(&ctx->io_bl_xa);

286 287
	/*
	 * Use 5 bits less than the max cq entries, that should give us around
288 289
	 * 32 entries per hash list if totally full and uniformly spread, but
	 * don't keep too many buckets to not overconsume memory.
290
	 */
291 292
	hash_bits = ilog2(p->cq_entries) - 5;
	hash_bits = clamp(hash_bits, 1, 8);
293
	if (io_alloc_hash_table(&ctx->cancel_table, hash_bits))
294
		goto err;
295 296
	if (io_alloc_hash_table(&ctx->cancel_table_locked, hash_bits))
		goto err;
297
	if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
298
			    0, GFP_KERNEL))
299
		goto err;
Jens Axboe's avatar
Jens Axboe committed
300 301

	ctx->flags = p->flags;
302
	atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
303
	init_waitqueue_head(&ctx->sqo_sq_wait);
304
	INIT_LIST_HEAD(&ctx->sqd_list);
305
	INIT_LIST_HEAD(&ctx->cq_overflow_list);
306
	INIT_LIST_HEAD(&ctx->io_buffers_cache);
307
	ret = io_alloc_cache_init(&ctx->rsrc_node_cache, IO_NODE_ALLOC_CACHE_MAX,
308
			    sizeof(struct io_rsrc_node));
309
	ret |= io_alloc_cache_init(&ctx->apoll_cache, IO_POLL_ALLOC_CACHE_MAX,
310
			    sizeof(struct async_poll));
311
	ret |= io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
312
			    sizeof(struct io_async_msghdr));
313
	ret |= io_alloc_cache_init(&ctx->rw_cache, IO_ALLOC_CACHE_MAX,
314
			    sizeof(struct io_async_rw));
315
	ret |= io_alloc_cache_init(&ctx->uring_cache, IO_ALLOC_CACHE_MAX,
316
			    sizeof(struct uring_cache));
317 318 319
	ret |= io_futex_cache_init(ctx);
	if (ret)
		goto err;
320
	init_completion(&ctx->ref_comp);
321
	xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
Jens Axboe's avatar
Jens Axboe committed
322
	mutex_init(&ctx->uring_lock);
323
	init_waitqueue_head(&ctx->cq_wait);
324
	init_waitqueue_head(&ctx->poll_wq);
325
	init_waitqueue_head(&ctx->rsrc_quiesce_wq);
Jens Axboe's avatar
Jens Axboe committed
326
	spin_lock_init(&ctx->completion_lock);
327
	spin_lock_init(&ctx->timeout_lock);
328
	INIT_WQ_LIST(&ctx->iopoll_list);
329
	INIT_LIST_HEAD(&ctx->io_buffers_comp);
330
	INIT_LIST_HEAD(&ctx->defer_list);
331
	INIT_LIST_HEAD(&ctx->timeout_list);
332
	INIT_LIST_HEAD(&ctx->ltimeout_list);
333
	INIT_LIST_HEAD(&ctx->rsrc_ref_list);
334
	init_llist_head(&ctx->work_llist);
335
	INIT_LIST_HEAD(&ctx->tctx_list);
336
	ctx->submit_state.free_list.next = NULL;
337
	INIT_HLIST_HEAD(&ctx->waitid_list);
338 339 340
#ifdef CONFIG_FUTEX
	INIT_HLIST_HEAD(&ctx->futex_list);
#endif
341
	INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
342
	INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
Ming Lei's avatar
Ming Lei committed
343
	INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
344 345
	io_napi_init(ctx);

Jens Axboe's avatar
Jens Axboe committed
346
	return ctx;
347
err:
348 349 350 351 352 353
	io_alloc_cache_free(&ctx->rsrc_node_cache, kfree);
	io_alloc_cache_free(&ctx->apoll_cache, kfree);
	io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
	io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
	io_alloc_cache_free(&ctx->uring_cache, kfree);
	io_futex_cache_free(ctx);
354
	kfree(ctx->cancel_table.hbs);
355
	kfree(ctx->cancel_table_locked.hbs);
356
	xa_destroy(&ctx->io_bl_xa);
357 358
	kfree(ctx);
	return NULL;
Jens Axboe's avatar
Jens Axboe committed
359 360
}

361 362 363 364 365 366 367 368
static void io_account_cq_overflow(struct io_ring_ctx *ctx)
{
	struct io_rings *r = ctx->rings;

	WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
	ctx->cq_extra--;
}

369
static bool req_need_defer(struct io_kiocb *req, u32 seq)
370
{
371 372
	if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
		struct io_ring_ctx *ctx = req->ctx;
373

374
		return seq + READ_ONCE(ctx->cq_extra) != ctx->cached_cq_tail;
375
	}
376

377
	return false;
378 379
}

380 381 382 383
static void io_clean_op(struct io_kiocb *req)
{
	if (req->flags & REQ_F_BUFFER_SELECTED) {
		spin_lock(&req->ctx->completion_lock);
384
		io_kbuf_drop(req);
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
		spin_unlock(&req->ctx->completion_lock);
	}

	if (req->flags & REQ_F_NEED_CLEANUP) {
		const struct io_cold_def *def = &io_cold_defs[req->opcode];

		if (def->cleanup)
			def->cleanup(req);
	}
	if ((req->flags & REQ_F_POLLED) && req->apoll) {
		kfree(req->apoll->double_poll);
		kfree(req->apoll);
		req->apoll = NULL;
	}
	if (req->flags & REQ_F_INFLIGHT) {
		struct io_uring_task *tctx = req->task->io_uring;

		atomic_dec(&tctx->inflight_tracked);
	}
	if (req->flags & REQ_F_CREDS)
		put_cred(req->creds);
	if (req->flags & REQ_F_ASYNC_DATA) {
		kfree(req->async_data);
		req->async_data = NULL;
	}
	req->flags &= ~IO_REQ_CLEAN_FLAGS;
}

413 414 415 416
static inline void io_req_track_inflight(struct io_kiocb *req)
{
	if (!(req->flags & REQ_F_INFLIGHT)) {
		req->flags |= REQ_F_INFLIGHT;
417
		atomic_inc(&req->task->io_uring->inflight_tracked);
418 419 420
	}
}

421 422
static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
{
423 424 425
	if (WARN_ON_ONCE(!req->link))
		return NULL;

426 427
	req->flags &= ~REQ_F_ARM_LTIMEOUT;
	req->flags |= REQ_F_LINK_TIMEOUT;
428 429

	/* linked timeouts should have two refs once prep'ed */
430
	io_req_set_refcount(req);
431 432
	__io_req_set_refcount(req->link, 2);
	return req->link;
433 434 435 436
}

static inline struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
{
437
	if (likely(!(req->flags & REQ_F_ARM_LTIMEOUT)))
438 439 440 441
		return NULL;
	return __io_prep_linked_timeout(req);
}

442 443 444 445 446 447 448 449 450 451 452
static noinline void __io_arm_ltimeout(struct io_kiocb *req)
{
	io_queue_linked_timeout(__io_prep_linked_timeout(req));
}

static inline void io_arm_ltimeout(struct io_kiocb *req)
{
	if (unlikely(req->flags & REQ_F_ARM_LTIMEOUT))
		__io_arm_ltimeout(req);
}

453 454
static void io_prep_async_work(struct io_kiocb *req)
{
455
	const struct io_issue_def *def = &io_issue_defs[req->opcode];
456 457
	struct io_ring_ctx *ctx = req->ctx;

458 459
	if (!(req->flags & REQ_F_CREDS)) {
		req->flags |= REQ_F_CREDS;
460
		req->creds = get_current_cred();
461
	}
462

463 464
	req->work.list.next = NULL;
	req->work.flags = 0;
465 466 467
	if (req->flags & REQ_F_FORCE_ASYNC)
		req->work.flags |= IO_WQ_WORK_CONCURRENT;

468
	if (req->file && !(req->flags & REQ_F_FIXED_FILE))
469
		req->flags |= io_file_get_flags(req->file);
470

471
	if (req->file && (req->flags & REQ_F_ISREG)) {
472 473 474 475 476 477 478
		bool should_hash = def->hash_reg_file;

		/* don't serialize this request if the fs doesn't need it */
		if (should_hash && (req->file->f_flags & O_DIRECT) &&
		    (req->file->f_mode & FMODE_DIO_PARALLEL_WRITE))
			should_hash = false;
		if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL))
479
			io_wq_hash_work(&req->work, file_inode(req->file));
480
	} else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
481 482 483
		if (def->unbound_nonreg_file)
			req->work.flags |= IO_WQ_WORK_UNBOUND;
	}
484
}
485

486
static void io_prep_async_link(struct io_kiocb *req)
487
{
488
	struct io_kiocb *cur;
489

490 491 492
	if (req->flags & REQ_F_LINK_TIMEOUT) {
		struct io_ring_ctx *ctx = req->ctx;

493
		spin_lock_irq(&ctx->timeout_lock);
494 495
		io_for_each_link(cur, req)
			io_prep_async_work(cur);
496
		spin_unlock_irq(&ctx->timeout_lock);
497 498 499 500
	} else {
		io_for_each_link(cur, req)
			io_prep_async_work(cur);
	}
501 502
}

503
static void io_queue_iowq(struct io_kiocb *req)
504
{
505
	struct io_kiocb *link = io_prep_linked_timeout(req);
506
	struct io_uring_task *tctx = req->task->io_uring;
507

508 509
	BUG_ON(!tctx);
	BUG_ON(!tctx->io_wq);
510

511 512
	/* init ->work of the whole link before punting */
	io_prep_async_link(req);
513 514 515 516 517 518 519 520 521 522 523

	/*
	 * Not expected to happen, but if we do have a bug where this _can_
	 * happen, catch it here and ensure the request is marked as
	 * canceled. That will make io-wq go through the usual work cancel
	 * procedure rather than attempt to run this request (or create a new
	 * worker for it).
	 */
	if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
		req->work.flags |= IO_WQ_WORK_CANCEL;

524
	trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
525
	io_wq_enqueue(tctx->io_wq, &req->work);
526 527
	if (link)
		io_queue_linked_timeout(link);
528 529
}

530
static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
531
{
532
	while (!list_empty(&ctx->defer_list)) {
533 534
		struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
						struct io_defer_entry, list);
535

536
		if (req_need_defer(de->req, de->seq))
537
			break;
538
		list_del_init(&de->list);
539
		io_req_task_queue(de->req);
540
		kfree(de);
541
	}
542 543
}

544
void io_eventfd_ops(struct rcu_head *rcu)
545
{
546
	struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
547
	int ops = atomic_xchg(&ev_fd->ops, 0);
548

549
	if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT))
550
		eventfd_signal_mask(ev_fd->cq_ev_fd, EPOLL_URING_WAKE);
551

552 553 554
	/* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback
	 * ordering in a race but if references are 0 we know we have to free
	 * it regardless.
555
	 */
556 557 558 559
	if (atomic_dec_and_test(&ev_fd->refs)) {
		eventfd_ctx_put(ev_fd->cq_ev_fd);
		kfree(ev_fd);
	}
560 561
}

562
static void io_eventfd_signal(struct io_ring_ctx *ctx)
563
{
564
	struct io_ev_fd *ev_fd = NULL;
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579

	rcu_read_lock();
	/*
	 * rcu_dereference ctx->io_ev_fd once and use it for both for checking
	 * and eventfd_signal
	 */
	ev_fd = rcu_dereference(ctx->io_ev_fd);

	/*
	 * Check again if ev_fd exists incase an io_eventfd_unregister call
	 * completed between the NULL check of ctx->io_ev_fd at the start of
	 * the function and rcu_read_lock.
	 */
	if (unlikely(!ev_fd))
		goto out;
580
	if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
581
		goto out;
582 583
	if (ev_fd->eventfd_async && !io_wq_current_is_worker())
		goto out;
584

585
	if (likely(eventfd_signal_allowed())) {
586
		eventfd_signal_mask(ev_fd->cq_ev_fd, EPOLL_URING_WAKE);
587 588 589
	} else {
		atomic_inc(&ev_fd->refs);
		if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
590
			call_rcu_hurry(&ev_fd->rcu, io_eventfd_ops);
591 592 593 594
		else
			atomic_dec(&ev_fd->refs);
	}

595 596
out:
	rcu_read_unlock();
597 598
}

599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
{
	bool skip;

	spin_lock(&ctx->completion_lock);

	/*
	 * Eventfd should only get triggered when at least one event has been
	 * posted. Some applications rely on the eventfd notification count
	 * only changing IFF a new CQE has been added to the CQ ring. There's
	 * no depedency on 1:1 relationship between how many times this
	 * function is called (and hence the eventfd count) and number of CQEs
	 * posted to the CQ ring.
	 */
	skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
	ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
	spin_unlock(&ctx->completion_lock);
	if (skip)
		return;

	io_eventfd_signal(ctx);
}

622 623
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
{
624 625
	if (ctx->poll_activated)
		io_poll_wq_wake(ctx);
626 627 628
	if (ctx->off_timeout_used)
		io_flush_timeouts(ctx);
	if (ctx->drain_active) {
629
		spin_lock(&ctx->completion_lock);
630
		io_queue_deferred(ctx);
631 632 633
		spin_unlock(&ctx->completion_lock);
	}
	if (ctx->has_evfd)
634
		io_eventfd_flush_signal(ctx);
635 636
}

637 638
static inline void __io_cq_lock(struct io_ring_ctx *ctx)
{
639
	if (!ctx->lockless_cq)
640 641 642
		spin_lock(&ctx->completion_lock);
}

643 644 645 646 647 648
static inline void io_cq_lock(struct io_ring_ctx *ctx)
	__acquires(ctx->completion_lock)
{
	spin_lock(&ctx->completion_lock);
}

649
static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
650 651
{
	io_commit_cqring(ctx);
652
	if (!ctx->task_complete) {
653 654 655 656 657
		if (!ctx->lockless_cq)
			spin_unlock(&ctx->completion_lock);
		/* IOPOLL rings only need to wake up if it's also SQPOLL */
		if (!ctx->syscall_iopoll)
			io_cqring_wake(ctx);
658
	}
659
	io_commit_cqring_flush(ctx);
660 661
}

662
static void io_cq_unlock_post(struct io_ring_ctx *ctx)
663
	__releases(ctx->completion_lock)
664
{
665 666 667
	io_commit_cqring(ctx);
	spin_unlock(&ctx->completion_lock);
	io_cqring_wake(ctx);
668
	io_commit_cqring_flush(ctx);
669 670
}

671
static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying)
672
{
673
	size_t cqe_size = sizeof(struct io_uring_cqe);
674

675 676
	lockdep_assert_held(&ctx->uring_lock);

677 678
	/* don't abort if we're dying, entries must get freed */
	if (!dying && __io_cqring_events(ctx) == ctx->cq_entries)
679
		return;
680

681 682 683
	if (ctx->flags & IORING_SETUP_CQE32)
		cqe_size <<= 1;

684
	io_cq_lock(ctx);
685
	while (!list_empty(&ctx->cq_overflow_list)) {
686
		struct io_uring_cqe *cqe;
687
		struct io_overflow_cqe *ocqe;
688

689 690
		ocqe = list_first_entry(&ctx->cq_overflow_list,
					struct io_overflow_cqe, list);
691 692 693 694 695 696

		if (!dying) {
			if (!io_get_cqe_overflow(ctx, &cqe, true))
				break;
			memcpy(cqe, &ocqe->cqe, cqe_size);
		}
697 698
		list_del(&ocqe->list);
		kfree(ocqe);
699 700
	}

701
	if (list_empty(&ctx->cq_overflow_list)) {
702
		clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
703
		atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
704
	}
705
	io_cq_unlock_post(ctx);
706 707
}

708 709 710 711 712 713
static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
{
	if (ctx->rings)
		__io_cqring_overflow_flush(ctx, true);
}

714 715
static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
{
716
	mutex_lock(&ctx->uring_lock);
717
	__io_cqring_overflow_flush(ctx, false);
718
	mutex_unlock(&ctx->uring_lock);
719 720
}

721
/* can be called by any task */
722
static void io_put_task_remote(struct task_struct *task)
723 724 725
{
	struct io_uring_task *tctx = task->io_uring;

726
	percpu_counter_sub(&tctx->inflight, 1);
727
	if (unlikely(atomic_read(&tctx->in_cancel)))
728
		wake_up(&tctx->wait);
729
	put_task_struct(task);
730 731
}

732
/* used by a task to put its own references */
733
static void io_put_task_local(struct task_struct *task)
734
{
735
	task->io_uring->cached_refs++;
736 737
}

738
/* must to be called somewhat shortly after putting a request */
739
static inline void io_put_task(struct task_struct *task)
740 741
{
	if (likely(task == current))
742
		io_put_task_local(task);
743
	else
744
		io_put_task_remote(task);
745 746
}

747
void io_task_refs_refill(struct io_uring_task *tctx)
748 749 750 751 752 753 754 755
{
	unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;

	percpu_counter_add(&tctx->inflight, refill);
	refcount_add(refill, &current->usage);
	tctx->cached_refs += refill;
}

756 757 758 759 760 761 762 763 764 765 766 767
static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
{
	struct io_uring_task *tctx = task->io_uring;
	unsigned int refs = tctx->cached_refs;

	if (refs) {
		tctx->cached_refs = 0;
		percpu_counter_sub(&tctx->inflight, refs);
		put_task_struct_many(task, refs);
	}
}

768 769
static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
				     s32 res, u32 cflags, u64 extra1, u64 extra2)
Jens Axboe's avatar
Jens Axboe committed
770
{
771
	struct io_overflow_cqe *ocqe;
772 773
	size_t ocq_size = sizeof(struct io_overflow_cqe);
	bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);
Jens Axboe's avatar
Jens Axboe committed
774

775 776
	lockdep_assert_held(&ctx->completion_lock);

777 778
	if (is_cqe32)
		ocq_size += sizeof(struct io_uring_cqe);
Jens Axboe's avatar
Jens Axboe committed
779

780
	ocqe = kmalloc(ocq_size, GFP_ATOMIC | __GFP_ACCOUNT);
781
	trace_io_uring_cqe_overflow(ctx, user_data, res, cflags, ocqe);
782 783 784 785 786 787
	if (!ocqe) {
		/*
		 * If we're in ring overflow flush mode, or in task cancel mode,
		 * or cannot allocate an overflow entry, then we need to drop it
		 * on the floor.
		 */
788
		io_account_cq_overflow(ctx);
789
		set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq);
790
		return false;
Jens Axboe's avatar
Jens Axboe committed
791
	}
792
	if (list_empty(&ctx->cq_overflow_list)) {
793
		set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
794
		atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
795

796
	}
797
	ocqe->cqe.user_data = user_data;
798 799
	ocqe->cqe.res = res;
	ocqe->cqe.flags = cflags;
800 801 802 803
	if (is_cqe32) {
		ocqe->cqe.big_cqe[0] = extra1;
		ocqe->cqe.big_cqe[1] = extra2;
	}
804 805
	list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
	return true;
Jens Axboe's avatar
Jens Axboe committed
806 807
}

808
static void io_req_cqe_overflow(struct io_kiocb *req)
809
{
810 811
	io_cqring_event_overflow(req->ctx, req->cqe.user_data,
				req->cqe.res, req->cqe.flags,
812 813
				req->big_cqe.extra1, req->big_cqe.extra2);
	memset(&req->big_cqe, 0, sizeof(req->big_cqe));
814 815
}

816 817 818 819 820
/*
 * writes to the cq entry need to come after reading head; the
 * control dependency is enough as we're using WRITE_ONCE to
 * fill the cq entry
 */
821
bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow)
822 823 824 825 826
{
	struct io_rings *rings = ctx->rings;
	unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
	unsigned int free, queued, len;

827 828 829 830 831 832
	/*
	 * Posting into the CQ when there are pending overflowed CQEs may break
	 * ordering guarantees, which will affect links, F_MORE users and more.
	 * Force overflow the completion.
	 */
	if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)))
833
		return false;
834 835 836 837 838 839 840

	/* userspace may cheat modifying the tail, be safe and do min */
	queued = min(__io_cqring_events(ctx), ctx->cq_entries);
	free = ctx->cq_entries - queued;
	/* we need a contiguous range, limit based on the current array offset */
	len = min(free, ctx->cq_entries - off);
	if (!len)
841
		return false;
842

843 844 845 846 847
	if (ctx->flags & IORING_SETUP_CQE32) {
		off <<= 1;
		len <<= 1;
	}

848 849
	ctx->cqe_cached = &rings->cqes[off];
	ctx->cqe_sentinel = ctx->cqe_cached + len;
850
	return true;
851 852
}

853 854
static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
			      u32 cflags)
855
{
856 857
	struct io_uring_cqe *cqe;

858
	ctx->cq_extra++;
859 860 861 862 863 864

	/*
	 * If we can't get a cq entry, userspace overflowed the
	 * submission (by quite a lot). Increment the overflow count in
	 * the ring.
	 */
865
	if (likely(io_get_cqe(ctx, &cqe))) {
866 867
		trace_io_uring_complete(ctx, NULL, user_data, res, cflags, 0, 0);

868 869 870
		WRITE_ONCE(cqe->user_data, user_data);
		WRITE_ONCE(cqe->res, res);
		WRITE_ONCE(cqe->flags, cflags);
871 872 873 874 875

		if (ctx->flags & IORING_SETUP_CQE32) {
			WRITE_ONCE(cqe->big_cqe[0], 0);
			WRITE_ONCE(cqe->big_cqe[1], 0);
		}
876 877
		return true;
	}
878
	return false;
879 880
}

881
bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
882 883 884
{
	bool filled;

885
	io_cq_lock(ctx);
886
	filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
887
	if (!filled)
888 889
		filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);

890
	io_cq_unlock_post(ctx);
891 892 893
	return filled;
}

894 895 896 897
/*
 * A helper for multishot requests posting additional CQEs.
 * Should only be used from a task_work including IO_URING_F_MULTISHOT.
 */
898
bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags)
Jens Axboe's avatar
Jens Axboe committed
899
{
900
	struct io_ring_ctx *ctx = req->ctx;
901
	bool posted;
902

903
	lockdep_assert(!io_wq_current_is_worker());
904 905
	lockdep_assert_held(&ctx->uring_lock);

906 907 908 909 910
	__io_cq_lock(ctx);
	posted = io_fill_cqe_aux(ctx, req->cqe.user_data, res, cflags);
	ctx->submit_state.cq_flush = true;
	__io_cq_unlock_post(ctx);
	return posted;
911 912
}

913
static void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
Jens Axboe's avatar
Jens Axboe committed
914
{
915 916
	struct io_ring_ctx *ctx = req->ctx;

917 918 919 920 921 922 923
	/*
	 * All execution paths but io-wq use the deferred completions by
	 * passing IO_URING_F_COMPLETE_DEFER and thus should not end up here.
	 */
	if (WARN_ON_ONCE(!(issue_flags & IO_URING_F_IOWQ)))
		return;

924 925 926 927 928 929 930 931 932 933
	/*
	 * Handle special CQ sync cases via task_work. DEFER_TASKRUN requires
	 * the submitter task context, IOPOLL protects with uring_lock.
	 */
	if (ctx->task_complete || (ctx->flags & IORING_SETUP_IOPOLL)) {
		req->io_task_work.func = io_req_task_complete;
		io_req_task_work_add(req);
		return;
	}

934
	io_cq_lock(ctx);
935 936 937 938
	if (!(req->flags & REQ_F_CQE_SKIP)) {
		if (!io_fill_cqe_req(ctx, req))
			io_req_cqe_overflow(req);
	}
939
	io_cq_unlock_post(ctx);
940

941 942 943 944
	/*
	 * We don't free the request here because we know it's called from
	 * io-wq only, which holds a reference, so it cannot be the last put.
	 */
945
	req_ref_put(req);
946 947
}

948
void io_req_defer_failed(struct io_kiocb *req, s32 res)
949
	__must_hold(&ctx->uring_lock)
950
{
951
	const struct io_cold_def *def = &io_cold_defs[req->opcode];
952

953 954
	lockdep_assert_held(&req->ctx->uring_lock);

955
	req_set_fail(req);
956
	io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
957 958
	if (def->fail)
		def->fail(req);
959
	io_req_complete_defer(req);
960 961
}

962 963 964 965 966 967 968 969 970 971
/*
 * Don't initialise the fields below on every allocation, but do that in
 * advance and keep them valid across allocations.
 */
static void io_preinit_req(struct io_kiocb *req, struct io_ring_ctx *ctx)
{
	req->ctx = ctx;
	req->link = NULL;
	req->async_data = NULL;
	/* not necessary, but safer to zero */
972
	memset(&req->cqe, 0, sizeof(req->cqe));
973
	memset(&req->big_cqe, 0, sizeof(req->big_cqe));
974 975
}

976 977 978 979 980 981
/*
 * A request might get retired back into the request caches even before opcode
 * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
 * Because of that, io_alloc_req() should be called only under ->uring_lock
 * and with extra caution to not get a request that is still worked on.
 */
982
__cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
983
	__must_hold(&ctx->uring_lock)
Jens Axboe's avatar
Jens Axboe committed
984
{
985
	gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
986
	void *reqs[IO_REQ_ALLOC_BATCH];
987
	int ret;
988

989
	ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);
990

991 992 993 994 995
	/*
	 * Bulk alloc is all-or-nothing. If we fail to get a batch,
	 * retry single alloc to be on the safe side.
	 */
	if (unlikely(ret <= 0)) {
996 997
		reqs[0] = kmem_cache_alloc(req_cachep, gfp);
		if (!reqs[0])
998
			return false;
999
		ret = 1;
Jens Axboe's avatar
Jens Axboe committed
1000
	}
1001

1002
	percpu_ref_get_many(&ctx->refs, ret);
1003 1004
	while (ret--) {
		struct io_kiocb *req = reqs[ret];
1005 1006

		io_preinit_req(req, ctx);
1007
		io_req_add_to_cache(req, ctx);
1008
	}
1009 1010 1011
	return true;
}

1012 1013
__cold void io_free_req(struct io_kiocb *req)
{
1014 1015 1016 1017 1018
	/* refs were already put, restore them for io_req_task_complete() */
	req->flags &= ~REQ_F_REFCOUNT;
	/* we only want to free it, don't post CQEs */
	req->flags |= REQ_F_CQE_SKIP;
	req->io_task_work.func = io_req_task_complete;
1019 1020 1021
	io_req_task_work_add(req);
}

1022 1023 1024 1025
static void __io_req_find_next_prep(struct io_kiocb *req)
{
	struct io_ring_ctx *ctx = req->ctx;

1026
	spin_lock(&ctx->completion_lock);
1027
	io_disarm_next(req);
1028
	spin_unlock(&ctx->completion_lock);
1029 1030 1031
}

static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
1032
{
1033
	struct io_kiocb *nxt;
1034

1035 1036 1037 1038 1039 1040
	/*
	 * If LINK is set, we have dependent requests in this chain. If we
	 * didn't fail this request, queue the first one up, moving any other
	 * dependencies to the next request. In case of failure, fail the rest
	 * of the chain.
	 */
1041 1042
	if (unlikely(req->flags & IO_DISARM_MASK))
		__io_req_find_next_prep(req);
1043 1044 1045
	nxt = req->link;
	req->link = NULL;
	return nxt;
1046
}
1047

1048
static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
1049 1050 1051
{
	if (!ctx)
		return;
1052 1053
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1054 1055 1056

	io_submit_flush_completions(ctx);
	mutex_unlock(&ctx->uring_lock);
1057 1058 1059
	percpu_ref_put(&ctx->refs);
}

1060 1061 1062 1063 1064 1065 1066 1067
/*
 * Run queued task_work, returning the number of entries processed in *count.
 * If more entries than max_entries are available, stop processing once this
 * is reached and return the rest of the list.
 */
struct llist_node *io_handle_tw_list(struct llist_node *node,
				     unsigned int *count,
				     unsigned int max_entries)
1068
{
1069 1070
	struct io_ring_ctx *ctx = NULL;
	struct io_tw_state ts = { };
1071

1072
	do {
1073
		struct llist_node *next = node->next;
1074 1075 1076
		struct io_kiocb *req = container_of(node, struct io_kiocb,
						    io_task_work.node);

1077 1078 1079
		if (req->ctx != ctx) {
			ctx_flush_and_put(ctx, &ts);
			ctx = req->ctx;
1080
			mutex_lock(&ctx->uring_lock);
1081
			percpu_ref_get(&ctx->refs);
1082
		}
1083 1084
		INDIRECT_CALL_2(req->io_task_work.func,
				io_poll_task_func, io_req_rw_complete,
1085
				req, &ts);
1086
		node = next;
1087
		(*count)++;
1088
		if (unlikely(need_resched())) {
1089 1090
			ctx_flush_and_put(ctx, &ts);
			ctx = NULL;
1091 1092
			cond_resched();
		}
1093
	} while (node && *count < max_entries);
1094

1095
	ctx_flush_and_put(ctx, &ts);
1096
	return node;
1097 1098
}

1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
/**
 * io_llist_xchg - swap all entries in a lock-less list
 * @head:	the head of lock-less list to delete all entries
 * @new:	new entry as the head of the list
 *
 * If list is empty, return NULL, otherwise, return the pointer to the first entry.
 * The order of entries returned is from the newest to the oldest added one.
 */
static inline struct llist_node *io_llist_xchg(struct llist_head *head,
					       struct llist_node *new)
{
	return xchg(&head->first, new);
}

1113
static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
1114 1115
{
	struct llist_node *node = llist_del_all(&tctx->task_list);
1116
	struct io_ring_ctx *last_ctx = NULL;
1117 1118 1119 1120 1121
	struct io_kiocb *req;

	while (node) {
		req = container_of(node, struct io_kiocb, io_task_work.node);
		node = node->next;
1122 1123 1124 1125 1126 1127 1128 1129
		if (sync && last_ctx != req->ctx) {
			if (last_ctx) {
				flush_delayed_work(&last_ctx->fallback_work);
				percpu_ref_put(&last_ctx->refs);
			}
			last_ctx = req->ctx;
			percpu_ref_get(&last_ctx->refs);
		}
1130 1131 1132 1133
		if (llist_add(&req->io_task_work.node,
			      &req->ctx->fallback_llist))
			schedule_delayed_work(&req->ctx->fallback_work, 1);
	}
1134 1135 1136 1137 1138

	if (last_ctx) {
		flush_delayed_work(&last_ctx->fallback_work);
		percpu_ref_put(&last_ctx->refs);
	}
1139 1140
}

1141 1142 1143
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
				      unsigned int max_entries,
				      unsigned int *count)
1144
{
1145
	struct llist_node *node;
Dylan Yudaken's avatar
Dylan Yudaken committed
1146

1147
	if (unlikely(current->flags & PF_EXITING)) {
1148
		io_fallback_tw(tctx, true);
1149
		return NULL;
1150
	}
Dylan Yudaken's avatar
Dylan Yudaken committed
1151

1152
	node = llist_del_all(&tctx->task_list);
1153 1154 1155 1156
	if (node) {
		node = llist_reverse_order(node);
		node = io_handle_tw_list(node, count, max_entries);
	}
1157

1158 1159
	/* relaxed read is enough as only the task itself sets ->in_cancel */
	if (unlikely(atomic_read(&tctx->in_cancel)))
1160
		io_uring_drop_tctx_refs(current);
1161

1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175
	trace_io_uring_task_work_run(tctx, *count);
	return node;
}

void tctx_task_work(struct callback_head *cb)
{
	struct io_uring_task *tctx;
	struct llist_node *ret;
	unsigned int count = 0;

	tctx = container_of(cb, struct io_uring_task, task_work);
	ret = tctx_task_work_run(tctx, UINT_MAX, &count);
	/* can't happen */
	WARN_ON_ONCE(ret);
1176 1177
}

1178
static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
1179 1180
{
	struct io_ring_ctx *ctx = req->ctx;
1181
	unsigned nr_wait, nr_tw, nr_tw_prev;
1182
	struct llist_node *head;
1183

1184 1185
	/* See comment above IO_CQ_WAKE_INIT */
	BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
1186

1187 1188 1189 1190
	/*
	 * We don't know how many reuqests is there in the link and whether
	 * they can even be queued lazily, fall back to non-lazy.
	 */
1191 1192
	if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
		flags &= ~IOU_F_TWQ_LAZY_WAKE;
1193

1194
	head = READ_ONCE(ctx->work_llist.first);
1195
	do {
1196
		nr_tw_prev = 0;
1197 1198
		if (head) {
			struct io_kiocb *first_req = container_of(head,
1199 1200 1201 1202 1203 1204 1205 1206
							struct io_kiocb,
							io_task_work.node);
			/*
			 * Might be executed at any moment, rely on
			 * SLAB_TYPESAFE_BY_RCU to keep it alive.
			 */
			nr_tw_prev = READ_ONCE(first_req->nr_tw);
		}
1207 1208 1209 1210 1211

		/*
		 * Theoretically, it can overflow, but that's fine as one of
		 * previous adds should've tried to wake the task.
		 */
1212 1213
		nr_tw = nr_tw_prev + 1;
		if (!(flags & IOU_F_TWQ_LAZY_WAKE))
1214
			nr_tw = IO_CQ_WAKE_FORCE;
1215 1216

		req->nr_tw = nr_tw;
1217 1218
		req->io_task_work.node.next = head;
	} while (!try_cmpxchg(&ctx->work_llist.first, &head,
1219 1220
			      &req->io_task_work.node));

1221 1222 1223 1224 1225 1226 1227 1228
	/*
	 * cmpxchg implies a full barrier, which pairs with the barrier
	 * in set_current_state() on the io_cqring_wait() side. It's used
	 * to ensure that either we see updated ->cq_wait_nr, or waiters
	 * going to sleep will observe the work added to the list, which
	 * is similar to the wait/wawke task state sync.
	 */

1229
	if (!head) {
1230 1231 1232 1233
		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
			atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
		if (ctx->has_evfd)
			io_eventfd_signal(ctx);
1234 1235
	}

1236
	nr_wait = atomic_read(&ctx->cq_wait_nr);
1237 1238
	/* not enough or no one is waiting */
	if (nr_tw < nr_wait)
1239
		return;
1240 1241
	/* the previous add has already woken it up */
	if (nr_tw_prev >= nr_wait)
1242 1243
		return;
	wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
1244 1245
}

1246
static void io_req_normal_work_add(struct io_kiocb *req)
1247
{
1248
	struct io_uring_task *tctx = req->task->io_uring;
1249
	struct io_ring_ctx *ctx = req->ctx;
1250 1251

	/* task_work already pending, we're done */
1252
	if (!llist_add(&req->io_task_work.node, &tctx->task_list))
1253
		return;
1254

1255 1256 1257
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);

1258
	/* SQPOLL doesn't need the task_work added, it'll run it itself */
1259 1260 1261 1262 1263
	if (ctx->flags & IORING_SETUP_SQPOLL) {
		struct io_sq_data *sqd = ctx->sq_data;

		if (wq_has_sleeper(&sqd->wait))
			wake_up(&sqd->wait);
1264
		return;
1265
	}
1266

1267
	if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
1268
		return;
1269

1270
	io_fallback_tw(tctx, false);
1271 1272
}

1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283
void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
{
	if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
		rcu_read_lock();
		io_req_local_work_add(req, flags);
		rcu_read_unlock();
	} else {
		io_req_normal_work_add(req);
	}
}

1284 1285 1286 1287 1288 1289 1290 1291 1292 1293
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
{
	struct llist_node *node;

	node = llist_del_all(&ctx->work_llist);
	while (node) {
		struct io_kiocb *req = container_of(node, struct io_kiocb,
						    io_task_work.node);

		node = node->next;
1294
		io_req_normal_work_add(req);
1295 1296 1297
	}
}

1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
				       int min_events)
{
	if (llist_empty(&ctx->work_llist))
		return false;
	if (events < min_events)
		return true;
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
	return false;
}

static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
			       int min_events)
1312 1313
{
	struct llist_node *node;
1314
	unsigned int loops = 0;
1315
	int ret = 0;
1316

1317
	if (WARN_ON_ONCE(ctx->submitter_task != current))
1318
		return -EEXIST;
1319 1320
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1321
again:
1322 1323 1324 1325 1326
	/*
	 * llists are in reverse order, flip it back the right way before
	 * running the pending items.
	 */
	node = llist_reverse_order(io_llist_xchg(&ctx->work_llist, NULL));
1327
	while (node) {
1328 1329 1330
		struct llist_node *next = node->next;
		struct io_kiocb *req = container_of(node, struct io_kiocb,
						    io_task_work.node);
1331 1332 1333
		INDIRECT_CALL_2(req->io_task_work.func,
				io_poll_task_func, io_req_rw_complete,
				req, ts);
1334 1335 1336
		ret++;
		node = next;
	}
1337
	loops++;
1338

1339
	if (io_run_local_work_continue(ctx, ret, min_events))
1340
		goto again;
1341 1342 1343
	io_submit_flush_completions(ctx);
	if (io_run_local_work_continue(ctx, ret, min_events))
		goto again;
1344

1345
	trace_io_uring_local_work_run(ctx, ret, loops);
1346
	return ret;
1347 1348
}

1349 1350
static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
					   int min_events)
1351
{
1352
	struct io_tw_state ts = {};
1353 1354 1355

	if (llist_empty(&ctx->work_llist))
		return 0;
1356
	return __io_run_local_work(ctx, &ts, min_events);
1357 1358
}

1359
static int io_run_local_work(struct io_ring_ctx *ctx, int min_events)
1360
{
1361
	struct io_tw_state ts = {};
1362 1363
	int ret;

1364
	mutex_lock(&ctx->uring_lock);
1365
	ret = __io_run_local_work(ctx, &ts, min_events);
1366
	mutex_unlock(&ctx->uring_lock);
1367
	return ret;
1368 1369
}

1370
static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts)
1371
{
1372
	io_tw_lock(req->ctx, ts);
1373
	io_req_defer_failed(req, req->cqe.res);
1374 1375
}

1376
void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts)
1377
{
1378
	io_tw_lock(req->ctx, ts);
1379
	/* req->task == current here, checking PF_EXITING is safe */
1380
	if (unlikely(req->task->flags & PF_EXITING))
1381
		io_req_defer_failed(req, -EFAULT);
1382
	else if (req->flags & REQ_F_FORCE_ASYNC)
1383
		io_queue_iowq(req);
1384 1385
	else
		io_queue_sqe(req);
1386 1387
}

1388
void io_req_task_queue_fail(struct io_kiocb *req, int ret)
1389
{
1390
	io_req_set_res(req, ret, 0);
1391
	req->io_task_work.func = io_req_task_cancel;
1392
	io_req_task_work_add(req);
1393 1394
}

1395
void io_req_task_queue(struct io_kiocb *req)
1396
{
1397
	req->io_task_work.func = io_req_task_submit;
1398
	io_req_task_work_add(req);
1399 1400
}

1401
void io_queue_next(struct io_kiocb *req)
1402
{
1403
	struct io_kiocb *nxt = io_req_find_next(req);
1404 1405

	if (nxt)
1406
		io_req_task_queue(nxt);
1407 1408
}

1409 1410
static void io_free_batch_list(struct io_ring_ctx *ctx,
			       struct io_wq_work_node *node)
1411
	__must_hold(&ctx->uring_lock)
1412
{
1413 1414 1415
	do {
		struct io_kiocb *req = container_of(node, struct io_kiocb,
						    comp_list);
1416

1417 1418 1419 1420 1421 1422
		if (unlikely(req->flags & IO_REQ_CLEAN_SLOW_FLAGS)) {
			if (req->flags & REQ_F_REFCOUNT) {
				node = req->comp_list.next;
				if (!req_ref_put_and_test(req))
					continue;
			}
1423 1424 1425 1426 1427
			if ((req->flags & REQ_F_POLLED) && req->apoll) {
				struct async_poll *apoll = req->apoll;

				if (apoll->double_poll)
					kfree(apoll->double_poll);
1428
				if (!io_alloc_cache_put(&ctx->apoll_cache, apoll))
1429
					kfree(apoll);
1430 1431
				req->flags &= ~REQ_F_POLLED;
			}
1432
			if (req->flags & IO_REQ_LINK_FLAGS)
1433
				io_queue_next(req);
1434 1435
			if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
				io_clean_op(req);
1436
		}
1437
		io_put_file(req);
1438
		io_put_rsrc_node(ctx, req->rsrc_node);
1439
		io_put_task(req->task);
1440

1441
		node = req->comp_list.next;
1442
		io_req_add_to_cache(req, ctx);
1443
	} while (node);
1444 1445
}

1446
void __io_submit_flush_completions(struct io_ring_ctx *ctx)
1447
	__must_hold(&ctx->uring_lock)
1448
{
1449
	struct io_submit_state *state = &ctx->submit_state;
1450
	struct io_wq_work_node *node;
1451

1452
	__io_cq_lock(ctx);
1453
	__wq_list_for_each(node, &state->compl_reqs) {
1454 1455
		struct io_kiocb *req = container_of(node, struct io_kiocb,
					    comp_list);
1456

1457
		if (!(req->flags & REQ_F_CQE_SKIP) &&
1458
		    unlikely(!io_fill_cqe_req(ctx, req))) {
1459
			if (ctx->lockless_cq) {
1460 1461 1462 1463 1464 1465 1466
				spin_lock(&ctx->completion_lock);
				io_req_cqe_overflow(req);
				spin_unlock(&ctx->completion_lock);
			} else {
				io_req_cqe_overflow(req);
			}
		}
1467
	}
1468
	__io_cq_unlock_post(ctx);
1469

1470 1471 1472 1473
	if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
		io_free_batch_list(ctx, state->compl_reqs.first);
		INIT_WQ_LIST(&state->compl_reqs);
	}
1474
	ctx->submit_state.cq_flush = false;
1475 1476
}

1477
static unsigned io_cqring_events(struct io_ring_ctx *ctx)
1478 1479 1480
{
	/* See comment at the top of this file */
	smp_rmb();
1481
	return __io_cqring_events(ctx);
1482 1483
}

1484 1485 1486 1487
/*
 * We can't just wait for polled events to come to us, we have to actively
 * find and complete them.
 */
1488
static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
1489 1490 1491 1492 1493
{
	if (!(ctx->flags & IORING_SETUP_IOPOLL))
		return;

	mutex_lock(&ctx->uring_lock);
1494
	while (!wq_list_empty(&ctx->iopoll_list)) {
1495
		/* let it sleep and repeat later if can't complete a request */
1496
		if (io_do_iopoll(ctx, true) == 0)
1497
			break;
1498 1499 1500
		/*
		 * Ensure we allow local-to-the-cpu processing to take place,
		 * in this case we need to ensure that we reap all events.
1501
		 * Also let task_work, etc. to progress by releasing the mutex
1502
		 */
1503 1504 1505 1506 1507
		if (need_resched()) {
			mutex_unlock(&ctx->uring_lock);
			cond_resched();
			mutex_lock(&ctx->uring_lock);
		}
1508 1509 1510 1511
	}
	mutex_unlock(&ctx->uring_lock);
}

1512
static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
1513
{
1514
	unsigned int nr_events = 0;
1515
	unsigned long check_cq;
1516

1517 1518
	lockdep_assert_held(&ctx->uring_lock);

1519 1520 1521
	if (!io_allowed_run_tw(ctx))
		return -EEXIST;

1522 1523 1524
	check_cq = READ_ONCE(ctx->check_cq);
	if (unlikely(check_cq)) {
		if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
1525
			__io_cqring_overflow_flush(ctx, false);
1526 1527 1528 1529 1530 1531 1532
		/*
		 * Similarly do not spin if we have not informed the user of any
		 * dropped CQE.
		 */
		if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
			return -EBADR;
	}
1533 1534 1535 1536 1537 1538
	/*
	 * Don't enter poll loop if we already have events pending.
	 * If we do, we can potentially be spinning for commands that
	 * already triggered a CQE (eg in error).
	 */
	if (io_cqring_events(ctx))
1539
		return 0;
1540

1541
	do {
1542 1543
		int ret = 0;

1544 1545 1546 1547 1548 1549 1550 1551 1552 1553
		/*
		 * If a submit got punted to a workqueue, we can have the
		 * application entering polling for a command before it gets
		 * issued. That app will hold the uring_lock for the duration
		 * of the poll right here, so we need to take a breather every
		 * now and then to ensure that the issue has a chance to add
		 * the poll to the issued list. Otherwise we can spin here
		 * forever, while the workqueue is stuck trying to acquire the
		 * very same mutex.
		 */
1554 1555
		if (wq_list_empty(&ctx->iopoll_list) ||
		    io_task_work_pending(ctx)) {
1556 1557
			u32 tail = ctx->cached_cq_tail;

1558
			(void) io_run_local_work_locked(ctx, min);
1559

1560 1561 1562
			if (task_work_pending(current) ||
			    wq_list_empty(&ctx->iopoll_list)) {
				mutex_unlock(&ctx->uring_lock);
1563
				io_run_task_work();
1564 1565
				mutex_lock(&ctx->uring_lock);
			}
1566 1567
			/* some requests don't go through iopoll_list */
			if (tail != ctx->cached_cq_tail ||
1568
			    wq_list_empty(&ctx->iopoll_list))
1569
				break;
1570
		}
1571
		ret = io_do_iopoll(ctx, !min);
1572 1573
		if (unlikely(ret < 0))
			return ret;
1574 1575 1576

		if (task_sigpending(current))
			return -EINTR;
1577
		if (need_resched())
1578
			break;
1579

1580
		nr_events += ret;
1581
	} while (nr_events < min);
1582

1583
	return 0;
1584
}
1585

1586
void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts)
1587
{
1588
	io_req_complete_defer(req);
1589 1590
}

1591 1592 1593
/*
 * After the iocb has been issued, it's safe to be found on the poll list.
 * Adding the kiocb to the list AFTER submission ensures that we don't
1594
 * find it from a io_do_iopoll() thread before the issuer is done
1595 1596
 * accessing the kiocb cookie.
 */
1597
static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
1598 1599
{
	struct io_ring_ctx *ctx = req->ctx;
1600
	const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
1601 1602

	/* workqueue context doesn't hold uring_lock, grab it now */
1603
	if (unlikely(needs_lock))
1604
		mutex_lock(&ctx->uring_lock);
1605 1606 1607 1608 1609 1610

	/*
	 * Track whether we have multiple files in our lists. This will impact
	 * how we do polling eventually, not spinning if we're on potentially
	 * different devices.
	 */
1611
	if (wq_list_empty(&ctx->iopoll_list)) {
1612 1613
		ctx->poll_multi_queue = false;
	} else if (!ctx->poll_multi_queue) {
1614 1615
		struct io_kiocb *list_req;

1616 1617
		list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
					comp_list);
1618
		if (list_req->file != req->file)
1619
			ctx->poll_multi_queue = true;
1620 1621 1622 1623 1624 1625
	}

	/*
	 * For fast devices, IO may have already completed. If it has, add
	 * it to the front so we find it first.
	 */
1626
	if (READ_ONCE(req->iopoll_completed))
1627
		wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
1628
	else
1629
		wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
1630

1631
	if (unlikely(needs_lock)) {
1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643
		/*
		 * If IORING_SETUP_SQPOLL is enabled, sqes are either handle
		 * in sq thread task context or in io worker task context. If
		 * current task context is sq thread, we don't need to check
		 * whether should wake up sq thread.
		 */
		if ((ctx->flags & IORING_SETUP_SQPOLL) &&
		    wq_has_sleeper(&ctx->sq_data->wait))
			wake_up(&ctx->sq_data->wait);

		mutex_unlock(&ctx->uring_lock);
	}
1644 1645
}

1646
io_req_flags_t io_file_get_flags(struct file *file)
1647
{
1648
	io_req_flags_t res = 0;
1649

1650
	if (S_ISREG(file_inode(file)->i_mode))
1651
		res |= REQ_F_ISREG;
1652
	if ((file->f_flags & O_NONBLOCK) || (file->f_mode & FMODE_NOWAIT))
1653
		res |= REQ_F_SUPPORT_NOWAIT;
1654
	return res;
Jens Axboe's avatar
Jens Axboe committed
1655 1656
}

1657
bool io_alloc_async_data(struct io_kiocb *req)
1658
{
Jens Axboe's avatar
Jens Axboe committed
1659 1660 1661 1662
	const struct io_issue_def *def = &io_issue_defs[req->opcode];

	WARN_ON_ONCE(!def->async_size);
	req->async_data = kmalloc(def->async_size, GFP_KERNEL);
1663 1664 1665 1666 1667
	if (req->async_data) {
		req->flags |= REQ_F_ASYNC_DATA;
		return false;
	}
	return true;
1668 1669
}

1670 1671
static u32 io_get_sequence(struct io_kiocb *req)
{
1672
	u32 seq = req->ctx->cached_sq_head;
1673
	struct io_kiocb *cur;
1674

1675
	/* need original cached_sq_head, but it was increased for each req */
1676
	io_for_each_link(cur, req)
1677 1678
		seq--;
	return seq;
1679 1680
}

1681
static __cold void io_drain_req(struct io_kiocb *req)
1682
	__must_hold(&ctx->uring_lock)
1683
{
1684
	struct io_ring_ctx *ctx = req->ctx;
1685
	struct io_defer_entry *de;
1686
	int ret;
1687
	u32 seq = io_get_sequence(req);
1688

1689
	/* Still need defer if there is pending req in defer list. */
1690
	spin_lock(&ctx->completion_lock);
1691
	if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list)) {
1692
		spin_unlock(&ctx->completion_lock);
1693
queue:
1694
		ctx->drain_active = false;
1695 1696
		io_req_task_queue(req);
		return;
1697
	}
1698
	spin_unlock(&ctx->completion_lock);
1699

1700
	io_prep_async_link(req);
1701
	de = kmalloc(sizeof(*de), GFP_KERNEL);
1702
	if (!de) {
1703
		ret = -ENOMEM;
1704 1705
		io_req_defer_failed(req, ret);
		return;
1706
	}
1707

1708
	spin_lock(&ctx->completion_lock);
1709
	if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
1710
		spin_unlock(&ctx->completion_lock);
1711
		kfree(de);
1712
		goto queue;
1713 1714
	}

1715
	trace_io_uring_defer(req);
1716
	de->req = req;
1717
	de->seq = seq;
1718
	list_add_tail(&de->list, &ctx->defer_list);
1719
	spin_unlock(&ctx->completion_lock);
1720 1721
}

1722 1723
static bool io_assign_file(struct io_kiocb *req, const struct io_issue_def *def,
			   unsigned int issue_flags)
1724
{
1725
	if (req->file || !def->needs_file)
1726 1727 1728
		return true;

	if (req->flags & REQ_F_FIXED_FILE)
1729
		req->file = io_file_get_fixed(req, req->cqe.fd, issue_flags);
1730
	else
1731
		req->file = io_file_get_normal(req, req->cqe.fd);
1732

1733
	return !!req->file;
1734 1735
}

1736
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
Jens Axboe's avatar
Jens Axboe committed
1737
{
1738
	const struct io_issue_def *def = &io_issue_defs[req->opcode];
1739
	const struct cred *creds = NULL;
1740
	int ret;
Jens Axboe's avatar
Jens Axboe committed
1741

1742
	if (unlikely(!io_assign_file(req, def, issue_flags)))
1743 1744
		return -EBADF;

1745
	if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
1746
		creds = override_creds(req->creds);
1747

1748
	if (!def->audit_skip)
1749 1750
		audit_uring_entry(req->opcode);

1751
	ret = def->issue(req, issue_flags);
Jens Axboe's avatar
Jens Axboe committed
1752

1753
	if (!def->audit_skip)
1754 1755
		audit_uring_exit(!ret, ret);

1756 1757
	if (creds)
		revert_creds(creds);
1758

1759 1760
	if (ret == IOU_OK) {
		if (issue_flags & IO_URING_F_COMPLETE_DEFER)
1761
			io_req_complete_defer(req);
1762
		else
1763
			io_req_complete_post(req, issue_flags);
1764

1765 1766
		return 0;
	}
1767

1768 1769 1770
	if (ret == IOU_ISSUE_SKIP_COMPLETE) {
		ret = 0;
		io_arm_ltimeout(req);
1771

1772 1773 1774 1775 1776
		/* If the op doesn't have a file, we're not polling for it */
		if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
			io_iopoll_req_issued(req, issue_flags);
	}
	return ret;
Jens Axboe's avatar
Jens Axboe committed
1777 1778
}

1779
int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts)
1780
{
1781
	io_tw_lock(req->ctx, ts);
1782 1783
	return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
				 IO_URING_F_COMPLETE_DEFER);
1784 1785
}

1786
struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
1787 1788
{
	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1789
	struct io_kiocb *nxt = NULL;
1790

1791 1792 1793 1794 1795 1796
	if (req_ref_put_and_test(req)) {
		if (req->flags & IO_REQ_LINK_FLAGS)
			nxt = io_req_find_next(req);
		io_free_req(req);
	}
	return nxt ? &nxt->work : NULL;
1797 1798
}

1799
void io_wq_submit_work(struct io_wq_work *work)
Jens Axboe's avatar
Jens Axboe committed
1800 1801
{
	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1802
	const struct io_issue_def *def = &io_issue_defs[req->opcode];
1803
	unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
1804
	bool needs_poll = false;
1805
	int ret = 0, err = -ECANCELED;
Jens Axboe's avatar
Jens Axboe committed
1806

1807
	/* one will be dropped by ->io_wq_free_work() after returning to io-wq */
1808 1809 1810 1811
	if (!(req->flags & REQ_F_REFCOUNT))
		__io_req_set_refcount(req, 2);
	else
		req_ref_get(req);
1812

1813
	io_arm_ltimeout(req);
1814

1815
	/* either cancelled or io-wq is dying, so don't touch tctx->iowq */
1816
	if (work->flags & IO_WQ_WORK_CANCEL) {
1817
fail:
1818
		io_req_task_queue_fail(req, err);
1819 1820
		return;
	}
1821
	if (!io_assign_file(req, def, issue_flags)) {
1822 1823 1824 1825
		err = -EBADF;
		work->flags |= IO_WQ_WORK_CANCEL;
		goto fail;
	}
1826

1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838
	/*
	 * If DEFER_TASKRUN is set, it's only allowed to post CQEs from the
	 * submitter task context. Final request completions are handed to the
	 * right context, however this is not the case of auxiliary CQEs,
	 * which is the main mean of operation for multishot requests.
	 * Don't allow any multishot execution from io-wq. It's more restrictive
	 * than necessary and also cleaner.
	 */
	if (req->flags & REQ_F_APOLL_MULTISHOT) {
		err = -EBADFD;
		if (!io_file_can_poll(req))
			goto fail;
1839 1840 1841 1842 1843 1844 1845 1846 1847
		if (req->file->f_flags & O_NONBLOCK ||
		    req->file->f_mode & FMODE_NOWAIT) {
			err = -ECANCELED;
			if (io_arm_poll_handler(req, issue_flags) != IO_APOLL_OK)
				goto fail;
			return;
		} else {
			req->flags &= ~REQ_F_APOLL_MULTISHOT;
		}
1848 1849
	}

1850
	if (req->flags & REQ_F_FORCE_ASYNC) {
1851 1852
		bool opcode_poll = def->pollin || def->pollout;

1853
		if (opcode_poll && io_file_can_poll(req)) {
1854
			needs_poll = true;
1855
			issue_flags |= IO_URING_F_NONBLOCK;
1856
		}
1857
	}
1858

1859 1860 1861 1862
	do {
		ret = io_issue_sqe(req, issue_flags);
		if (ret != -EAGAIN)
			break;
1863 1864 1865 1866 1867 1868 1869 1870

		/*
		 * If REQ_F_NOWAIT is set, then don't wait or retry with
		 * poll. -EAGAIN is final for that case.
		 */
		if (req->flags & REQ_F_NOWAIT)
			break;

1871 1872 1873 1874 1875 1876
		/*
		 * We can get EAGAIN for iopolled IO even though we're
		 * forcing a sync submission from here, since we can't
		 * wait for request slots on the block side.
		 */
		if (!needs_poll) {
1877 1878
			if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
				break;
1879 1880
			if (io_wq_worker_stopped())
				break;
1881 1882
			cond_resched();
			continue;
1883 1884
		}

1885
		if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
1886 1887 1888 1889 1890
			return;
		/* aborted or ready, in either case retry blocking */
		needs_poll = false;
		issue_flags &= ~IO_URING_F_NONBLOCK;
	} while (1);
1891

1892
	/* avoid locking problems by failing it from a clean context */
1893
	if (ret < 0)
1894
		io_req_task_queue_fail(req, ret);
Jens Axboe's avatar
Jens Axboe committed
1895 1896
}

1897 1898
inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
				      unsigned int issue_flags)
1899
{
1900
	struct io_ring_ctx *ctx = req->ctx;
1901
	struct io_fixed_file *slot;
1902
	struct file *file = NULL;
1903

1904
	io_ring_submit_lock(ctx, issue_flags);
1905

1906
	if (unlikely((unsigned int)fd >= ctx->nr_user_files))
1907
		goto out;
1908
	fd = array_index_nospec(fd, ctx->nr_user_files);
1909
	slot = io_fixed_file_slot(&ctx->file_table, fd);
1910 1911
	if (!req->rsrc_node)
		__io_req_set_rsrc_node(req, ctx);
1912
	req->flags |= io_slot_flags(slot);
1913
	file = io_slot_file(slot);
1914
out:
1915
	io_ring_submit_unlock(ctx, issue_flags);
1916 1917
	return file;
}
1918

1919
struct file *io_file_get_normal(struct io_kiocb *req, int fd)
1920
{
1921
	struct file *file = fget(fd);
1922

1923
	trace_io_uring_file_get(req, fd);
1924

1925
	/* we don't allow fixed io_uring files */
1926
	if (file && io_is_uring_fops(file))
1927
		io_req_track_inflight(req);
1928
	return file;
1929 1930
}

1931
static void io_queue_async(struct io_kiocb *req, int ret)
1932 1933
	__must_hold(&req->ctx->uring_lock)
{
1934 1935 1936
	struct io_kiocb *linked_timeout;

	if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
1937
		io_req_defer_failed(req, ret);
1938 1939 1940 1941
		return;
	}

	linked_timeout = io_prep_linked_timeout(req);
1942

1943
	switch (io_arm_poll_handler(req, 0)) {
1944
	case IO_APOLL_READY:
1945
		io_kbuf_recycle(req, 0);
1946 1947 1948
		io_req_task_queue(req);
		break;
	case IO_APOLL_ABORTED:
1949
		io_kbuf_recycle(req, 0);
1950
		io_queue_iowq(req);
1951
		break;
1952 1953
	case IO_APOLL_OK:
		break;
1954 1955 1956 1957 1958 1959
	}

	if (linked_timeout)
		io_queue_linked_timeout(linked_timeout);
}

1960
static inline void io_queue_sqe(struct io_kiocb *req)
1961
	__must_hold(&req->ctx->uring_lock)
Jens Axboe's avatar
Jens Axboe committed
1962
{
1963
	int ret;
Jens Axboe's avatar
Jens Axboe committed
1964

1965
	ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
1966

1967 1968 1969 1970
	/*
	 * We async punt it if the file wasn't marked NOWAIT, or if the file
	 * doesn't support non-blocking read/write attempts
	 */
1971
	if (unlikely(ret))
1972
		io_queue_async(req, ret);
Jens Axboe's avatar
Jens Axboe committed
1973 1974
}

1975
static void io_queue_sqe_fallback(struct io_kiocb *req)
1976
	__must_hold(&req->ctx->uring_lock)
1977
{
1978 1979 1980 1981 1982 1983 1984
	if (unlikely(req->flags & REQ_F_FAIL)) {
		/*
		 * We don't submit, fail them all, for that replace hardlinks
		 * with normal links. Extra REQ_F_LINK is tolerated.
		 */
		req->flags &= ~REQ_F_HARDLINK;
		req->flags |= REQ_F_LINK;
1985
		io_req_defer_failed(req, req->cqe.res);
1986
	} else {
1987 1988
		if (unlikely(req->ctx->drain_active))
			io_drain_req(req);
1989
		else
1990
			io_queue_iowq(req);
Jens Axboe's avatar
Jens Axboe committed
1991
	}
1992 1993
}

1994 1995 1996 1997 1998 1999 2000 2001
/*
 * Check SQE restrictions (opcode and flags).
 *
 * Returns 'true' if SQE is allowed, 'false' otherwise.
 */
static inline bool io_check_restriction(struct io_ring_ctx *ctx,
					struct io_kiocb *req,
					unsigned int sqe_flags)
2002
{
2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014
	if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
		return false;

	if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
	    ctx->restrictions.sqe_flags_required)
		return false;

	if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
			  ctx->restrictions.sqe_flags_required))
		return false;

	return true;
2015 2016
}

2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027
static void io_init_req_drain(struct io_kiocb *req)
{
	struct io_ring_ctx *ctx = req->ctx;
	struct io_kiocb *head = ctx->submit_state.link.head;

	ctx->drain_active = true;
	if (head) {
		/*
		 * If we need to drain a request in the middle of a link, drain
		 * the head request and the next request/link after the current
		 * link. Considering sequential execution of links,
2028
		 * REQ_F_IO_DRAIN will be maintained for every request of our
2029 2030
		 * link.
		 */
2031
		head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
2032 2033 2034 2035
		ctx->drain_next = true;
	}
}

2036 2037 2038 2039 2040 2041 2042
static __cold int io_init_fail_req(struct io_kiocb *req, int err)
{
	/* ensure per-opcode data is cleared if we fail before prep */
	memset(&req->cmd.data, 0, sizeof(req->cmd.data));
	return err;
}

2043 2044
static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
		       const struct io_uring_sqe *sqe)
2045
	__must_hold(&ctx->uring_lock)
2046
{
2047
	const struct io_issue_def *def;
2048
	unsigned int sqe_flags;
2049
	int personality;
2050
	u8 opcode;
2051

2052
	/* req is partially pre-initialised, see io_preinit_req() */
2053
	req->opcode = opcode = READ_ONCE(sqe->opcode);
2054
	/* same numerical values with corresponding REQ_F_*, safe to copy */
2055 2056
	sqe_flags = READ_ONCE(sqe->flags);
	req->flags = (io_req_flags_t) sqe_flags;
2057
	req->cqe.user_data = READ_ONCE(sqe->user_data);
2058
	req->file = NULL;
2059
	req->rsrc_node = NULL;
2060 2061
	req->task = current;

2062 2063
	if (unlikely(opcode >= IORING_OP_LAST)) {
		req->opcode = 0;
2064
		return io_init_fail_req(req, -EINVAL);
2065
	}
2066
	def = &io_issue_defs[opcode];
2067 2068 2069
	if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
		/* enforce forwards compatibility on users */
		if (sqe_flags & ~SQE_VALID_FLAGS)
2070
			return io_init_fail_req(req, -EINVAL);
2071
		if (sqe_flags & IOSQE_BUFFER_SELECT) {
2072
			if (!def->buffer_select)
2073
				return io_init_fail_req(req, -EOPNOTSUPP);
2074 2075
			req->buf_index = READ_ONCE(sqe->buf_group);
		}
2076 2077 2078 2079
		if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
			ctx->drain_disabled = true;
		if (sqe_flags & IOSQE_IO_DRAIN) {
			if (ctx->drain_disabled)
2080
				return io_init_fail_req(req, -EOPNOTSUPP);
2081
			io_init_req_drain(req);
2082
		}
2083 2084 2085
	}
	if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
		if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
2086
			return io_init_fail_req(req, -EACCES);
2087 2088 2089 2090 2091 2092 2093
		/* knock it to the slow queue path, will be drained there */
		if (ctx->drain_active)
			req->flags |= REQ_F_FORCE_ASYNC;
		/* if there is no link, we're at "next" request and need to drain */
		if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
			ctx->drain_next = false;
			ctx->drain_active = true;
2094
			req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
2095
		}
2096
	}
2097

2098
	if (!def->ioprio && sqe->ioprio)
2099
		return io_init_fail_req(req, -EINVAL);
2100
	if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL))
2101
		return io_init_fail_req(req, -EINVAL);
2102

2103
	if (def->needs_file) {
2104 2105
		struct io_submit_state *state = &ctx->submit_state;

2106
		req->cqe.fd = READ_ONCE(sqe->fd);
2107

2108 2109 2110 2111
		/*
		 * Plug now if we have more than 2 IO left after this, and the
		 * target is potentially a read/write to block based storage.
		 */
2112
		if (state->need_plug && def->plug) {
2113 2114
			state->plug_started = true;
			state->need_plug = false;
2115
			blk_start_plug_nr_ios(&state->plug, state->submit_nr);
2116
		}
2117
	}
2118

2119 2120
	personality = READ_ONCE(sqe->personality);
	if (personality) {
2121 2122
		int ret;

2123 2124
		req->creds = xa_load(&ctx->personalities, personality);
		if (!req->creds)
2125
			return io_init_fail_req(req, -EINVAL);
2126
		get_cred(req->creds);
2127 2128 2129
		ret = security_uring_override_creds(req->creds);
		if (ret) {
			put_cred(req->creds);
2130
			return io_init_fail_req(req, ret);
2131
		}
2132
		req->flags |= REQ_F_CREDS;
2133
	}
2134

2135
	return def->prep(req, sqe);
2136 2137
}

2138 2139 2140 2141 2142 2143 2144
static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe,
				      struct io_kiocb *req, int ret)
{
	struct io_ring_ctx *ctx = req->ctx;
	struct io_submit_link *link = &ctx->submit_state.link;
	struct io_kiocb *head = link->head;

2145
	trace_io_uring_req_failed(sqe, req, ret);
2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175

	/*
	 * Avoid breaking links in the middle as it renders links with SQPOLL
	 * unusable. Instead of failing eagerly, continue assembling the link if
	 * applicable and mark the head with REQ_F_FAIL. The link flushing code
	 * should find the flag and handle the rest.
	 */
	req_fail_link_node(req, ret);
	if (head && !(head->flags & REQ_F_FAIL))
		req_fail_link_node(head, -ECANCELED);

	if (!(req->flags & IO_REQ_LINK_FLAGS)) {
		if (head) {
			link->last->link = req;
			link->head = NULL;
			req = head;
		}
		io_queue_sqe_fallback(req);
		return ret;
	}

	if (head)
		link->last->link = req;
	else
		link->head = req;
	link->last = req;
	return 0;
}

static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2176
			 const struct io_uring_sqe *sqe)
2177
	__must_hold(&ctx->uring_lock)
2178
{
2179
	struct io_submit_link *link = &ctx->submit_state.link;
2180
	int ret;
2181

2182
	ret = io_init_req(ctx, req, sqe);
2183 2184
	if (unlikely(ret))
		return io_submit_fail_init(sqe, req, ret);
2185

2186
	trace_io_uring_submit_req(req);
2187

2188 2189 2190 2191 2192 2193 2194
	/*
	 * If we already have a head request, queue this one for async
	 * submittal once the head completes. If we don't have a head but
	 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
	 * submitted sync once the chain is complete. If none of those
	 * conditions are true (normal request), then just queue it.
	 */
2195
	if (unlikely(link->head)) {
2196
		trace_io_uring_link(req, link->head);
2197
		link->last->link = req;
2198
		link->last = req;
2199

2200
		if (req->flags & IO_REQ_LINK_FLAGS)
2201
			return 0;
2202 2203
		/* last request of the link, flush it */
		req = link->head;
2204
		link->head = NULL;
2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216
		if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
			goto fallback;

	} else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
					  REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
		if (req->flags & IO_REQ_LINK_FLAGS) {
			link->head = req;
			link->last = req;
		} else {
fallback:
			io_queue_sqe_fallback(req);
		}
2217
		return 0;
2218
	}
2219

2220
	io_queue_sqe(req);
2221
	return 0;
2222 2223
}

2224 2225 2226
/*
 * Batched submission is done, ensure local IO is flushed out.
 */
2227
static void io_submit_state_end(struct io_ring_ctx *ctx)
2228
{
2229 2230
	struct io_submit_state *state = &ctx->submit_state;

2231 2232
	if (unlikely(state->link.head))
		io_queue_sqe_fallback(state->link.head);
2233
	/* flush only after queuing links as they can generate completions */
2234
	io_submit_flush_completions(ctx);
2235 2236
	if (state->plug_started)
		blk_finish_plug(&state->plug);
2237 2238 2239 2240 2241 2242
}

/*
 * Start submission side cache.
 */
static void io_submit_state_start(struct io_submit_state *state,
2243
				  unsigned int max_ios)
2244
{
2245
	state->plug_started = false;
2246
	state->need_plug = max_ios > 2;
2247
	state->submit_nr = max_ios;
2248 2249
	/* set only head, no need to init link_last in advance */
	state->link.head = NULL;
2250 2251
}

Jens Axboe's avatar
Jens Axboe committed
2252 2253
static void io_commit_sqring(struct io_ring_ctx *ctx)
{
2254
	struct io_rings *rings = ctx->rings;
Jens Axboe's avatar
Jens Axboe committed
2255

2256 2257 2258 2259 2260 2261
	/*
	 * Ensure any loads from the SQEs are done at this point,
	 * since once we write the new head, the application could
	 * write new data to them.
	 */
	smp_store_release(&rings->sq.head, ctx->cached_sq_head);
Jens Axboe's avatar
Jens Axboe committed
2262 2263 2264
}

/*
2265
 * Fetch an sqe, if one is available. Note this returns a pointer to memory
Jens Axboe's avatar
Jens Axboe committed
2266 2267 2268 2269 2270 2271
 * that is mapped by userspace. This means that care needs to be taken to
 * ensure that reads are stable, as we cannot rely on userspace always
 * being a good citizen. If members of the sqe are validated and then later
 * used, it's important that those reads are done through READ_ONCE() to
 * prevent a re-load down the line.
 */
2272
static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
Jens Axboe's avatar
Jens Axboe committed
2273
{
2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288
	unsigned mask = ctx->sq_entries - 1;
	unsigned head = ctx->cached_sq_head++ & mask;

	if (!(ctx->flags & IORING_SETUP_NO_SQARRAY)) {
		head = READ_ONCE(ctx->sq_array[head]);
		if (unlikely(head >= ctx->sq_entries)) {
			/* drop invalid entries */
			spin_lock(&ctx->completion_lock);
			ctx->cq_extra--;
			spin_unlock(&ctx->completion_lock);
			WRITE_ONCE(ctx->rings->sq_dropped,
				   READ_ONCE(ctx->rings->sq_dropped) + 1);
			return false;
		}
	}
Jens Axboe's avatar
Jens Axboe committed
2289 2290 2291 2292 2293 2294 2295 2296 2297 2298

	/*
	 * The cached sq head (or cq tail) serves two purposes:
	 *
	 * 1) allows us to batch the cost of updating the user visible
	 *    head updates.
	 * 2) allows the kernel side to track the head on its own, even
	 *    though the application is the one updating it.
	 */

2299 2300 2301 2302 2303
	/* double index for 128-byte SQEs, twice as long */
	if (ctx->flags & IORING_SETUP_SQE128)
		head <<= 1;
	*sqe = &ctx->sq_sqes[head];
	return true;
2304 2305
}

2306
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
2307
	__must_hold(&ctx->uring_lock)
2308
{
2309
	unsigned int entries = io_sqring_entries(ctx);
2310 2311
	unsigned int left;
	int ret;
2312

2313
	if (unlikely(!entries))
2314
		return 0;
2315
	/* make sure SQ entry isn't read before tail */
2316
	ret = left = min(nr, entries);
2317 2318
	io_get_task_refs(left);
	io_submit_state_start(&ctx->submit_state, left);
2319

2320
	do {
2321
		const struct io_uring_sqe *sqe;
2322
		struct io_kiocb *req;
2323

2324
		if (unlikely(!io_alloc_req(ctx, &req)))
2325
			break;
2326
		if (unlikely(!io_get_sqe(ctx, &sqe))) {
2327
			io_req_add_to_cache(req, ctx);
2328 2329
			break;
		}
2330

2331 2332 2333 2334 2335 2336 2337 2338
		/*
		 * Continue submitting even for sqe failure if the
		 * ring was setup with IORING_SETUP_SUBMIT_ALL
		 */
		if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
		    !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
			left--;
			break;
2339
		}
2340
	} while (--left);
2341

2342 2343 2344 2345 2346 2347
	if (unlikely(left)) {
		ret -= left;
		/* try again if it submitted nothing and can't allocate a req */
		if (!ret && io_req_cache_empty(ctx))
			ret = -EAGAIN;
		current->io_uring->cached_refs += left;
2348
	}
2349

2350
	io_submit_state_end(ctx);
2351 2352
	 /* Commit SQ ring head once we've consumed and submitted all SQEs */
	io_commit_sqring(ctx);
2353
	return ret;
2354 2355
}

2356 2357 2358
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
			    int wake_flags, void *key)
{
2359
	struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
2360

2361 2362 2363 2364
	/*
	 * Cannot safely flush overflowed CQEs from here, ensure we wake up
	 * the task, and the next invocation will do it.
	 */
2365
	if (io_should_wake(iowq) || io_has_work(iowq->ctx))
2366 2367
		return autoremove_wake_function(curr, mode, wake_flags, key);
	return -1;
2368 2369
}

2370
int io_run_task_work_sig(struct io_ring_ctx *ctx)
2371
{
2372
	if (!llist_empty(&ctx->work_llist)) {
2373
		__set_current_state(TASK_RUNNING);
2374
		if (io_run_local_work(ctx, INT_MAX) > 0)
2375
			return 0;
2376 2377
	}
	if (io_run_task_work() > 0)
2378
		return 0;
2379 2380 2381
	if (task_sigpending(current))
		return -EINTR;
	return 0;
2382 2383
}

2384 2385 2386 2387 2388 2389 2390 2391 2392
static bool current_pending_io(void)
{
	struct io_uring_task *tctx = current->io_uring;

	if (!tctx)
		return false;
	return percpu_counter_read_positive(&tctx->inflight);
}

2393 2394
/* when returns >0, the caller should retry */
static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
2395
					  struct io_wait_queue *iowq)
2396
{
2397
	int ret;
2398

2399 2400
	if (unlikely(READ_ONCE(ctx->check_cq)))
		return 1;
2401 2402 2403 2404 2405 2406 2407 2408
	if (unlikely(!llist_empty(&ctx->work_llist)))
		return 1;
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL)))
		return 1;
	if (unlikely(task_sigpending(current)))
		return -EINTR;
	if (unlikely(io_should_wake(iowq)))
		return 0;
2409 2410

	/*
2411 2412 2413
	 * Mark us as being in io_wait if we have pending requests, so cpufreq
	 * can take into account that the task is waiting for IO - turns out
	 * to be important for low QD IO.
2414
	 */
2415 2416
	if (current_pending_io())
		current->in_iowait = 1;
2417
	ret = 0;
2418
	if (iowq->timeout == KTIME_MAX)
2419
		schedule();
2420
	else if (!schedule_hrtimeout(&iowq->timeout, HRTIMER_MODE_ABS))
2421
		ret = -ETIME;
2422
	current->in_iowait = 0;
2423
	return ret;
2424 2425
}

Jens Axboe's avatar
Jens Axboe committed
2426 2427 2428 2429 2430
/*
 * Wait until events become available, if we don't already have some. The
 * application must reap them itself, as they reside on the shared cq ring.
 */
static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2431 2432
			  const sigset_t __user *sig, size_t sigsz,
			  struct __kernel_timespec __user *uts)
Jens Axboe's avatar
Jens Axboe committed
2433
{
2434
	struct io_wait_queue iowq;
2435
	struct io_rings *rings = ctx->rings;
2436
	int ret;
Jens Axboe's avatar
Jens Axboe committed
2437

2438 2439
	if (!io_allowed_run_tw(ctx))
		return -EEXIST;
2440
	if (!llist_empty(&ctx->work_llist))
2441
		io_run_local_work(ctx, min_events);
2442
	io_run_task_work();
2443 2444 2445

	if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
		io_cqring_do_overflow_flush(ctx);
2446 2447
	if (__io_cqring_events_user(ctx) >= min_events)
		return 0;
Jens Axboe's avatar
Jens Axboe committed
2448

2449 2450 2451 2452
	init_waitqueue_func_entry(&iowq.wq, io_wake_function);
	iowq.wq.private = current;
	INIT_LIST_HEAD(&iowq.wq.entry);
	iowq.ctx = ctx;
2453
	iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
2454
	iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
2455 2456 2457 2458 2459 2460 2461
	iowq.timeout = KTIME_MAX;

	if (uts) {
		struct timespec64 ts;

		if (get_timespec64(&ts, uts))
			return -EFAULT;
2462

2463
		iowq.timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
2464
		io_napi_adjust_timeout(ctx, &iowq, &ts);
2465
	}
2466

2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479
	if (sig) {
#ifdef CONFIG_COMPAT
		if (in_compat_syscall())
			ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
						      sigsz);
		else
#endif
			ret = set_user_sigmask(sig, sigsz);

		if (ret)
			return ret;
	}

2480 2481
	io_napi_busy_loop(ctx, &iowq);

2482
	trace_io_uring_cqring_wait(ctx, min_events);
2483
	do {
2484
		int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
2485 2486
		unsigned long check_cq;

2487
		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
2488
			atomic_set(&ctx->cq_wait_nr, nr_wait);
2489 2490 2491 2492 2493 2494
			set_current_state(TASK_INTERRUPTIBLE);
		} else {
			prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
							TASK_INTERRUPTIBLE);
		}

2495
		ret = io_cqring_wait_schedule(ctx, &iowq);
2496
		__set_current_state(TASK_RUNNING);
2497
		atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
2498

2499 2500 2501 2502 2503 2504 2505
		/*
		 * Run task_work after scheduling and before io_should_wake().
		 * If we got woken because of task_work being processed, run it
		 * now rather than let the caller do another wait loop.
		 */
		io_run_task_work();
		if (!llist_empty(&ctx->work_llist))
2506
			io_run_local_work(ctx, nr_wait);
2507

2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519
		/*
		 * Non-local task_work will be run on exit to userspace, but
		 * if we're using DEFER_TASKRUN, then we could have waited
		 * with a timeout for a number of requests. If the timeout
		 * hits, we could have some requests ready to process. Ensure
		 * this break is _after_ we have run task_work, to avoid
		 * deferring running potentially pending requests until the
		 * next time we wait for events.
		 */
		if (ret < 0)
			break;

2520 2521 2522
		check_cq = READ_ONCE(ctx->check_cq);
		if (unlikely(check_cq)) {
			/* let the caller flush overflows, retry */
2523
			if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
2524 2525 2526 2527 2528 2529 2530
				io_cqring_do_overflow_flush(ctx);
			if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
				ret = -EBADR;
				break;
			}
		}

2531 2532
		if (io_should_wake(&iowq)) {
			ret = 0;
2533
			break;
2534
		}
2535
		cond_resched();
2536
	} while (1);
2537

2538 2539
	if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
		finish_wait(&ctx->cq_wait, &iowq.wq);
2540
	restore_saved_sigmask_unless(ret == -EINTR);
Jens Axboe's avatar
Jens Axboe committed
2541

2542
	return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
Jens Axboe's avatar
Jens Axboe committed
2543 2544
}

2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558
static void *io_rings_map(struct io_ring_ctx *ctx, unsigned long uaddr,
			  size_t size)
{
	return __io_uaddr_map(&ctx->ring_pages, &ctx->n_ring_pages, uaddr,
				size);
}

static void *io_sqes_map(struct io_ring_ctx *ctx, unsigned long uaddr,
			 size_t size)
{
	return __io_uaddr_map(&ctx->sqe_pages, &ctx->n_sqe_pages, uaddr,
				size);
}

2559 2560
static void io_rings_free(struct io_ring_ctx *ctx)
{
2561
	if (!(ctx->flags & IORING_SETUP_NO_MMAP)) {
2562 2563 2564 2565
		io_pages_unmap(ctx->rings, &ctx->ring_pages, &ctx->n_ring_pages,
				true);
		io_pages_unmap(ctx->sq_sqes, &ctx->sqe_pages, &ctx->n_sqe_pages,
				true);
2566 2567
	} else {
		io_pages_free(&ctx->ring_pages, ctx->n_ring_pages);
2568
		ctx->n_ring_pages = 0;
2569
		io_pages_free(&ctx->sqe_pages, ctx->n_sqe_pages);
2570
		ctx->n_sqe_pages = 0;
2571 2572
		vunmap(ctx->rings);
		vunmap(ctx->sq_sqes);
2573
	}
2574 2575 2576

	ctx->rings = NULL;
	ctx->sq_sqes = NULL;
2577 2578
}

2579 2580
static unsigned long rings_size(struct io_ring_ctx *ctx, unsigned int sq_entries,
				unsigned int cq_entries, size_t *sq_offset)
2581
{
2582 2583
	struct io_rings *rings;
	size_t off, sq_array_size;
2584

2585 2586 2587 2588 2589 2590 2591
	off = struct_size(rings, cqes, cq_entries);
	if (off == SIZE_MAX)
		return SIZE_MAX;
	if (ctx->flags & IORING_SETUP_CQE32) {
		if (check_shl_overflow(off, 1, &off))
			return SIZE_MAX;
	}
2592

2593 2594 2595 2596 2597
#ifdef CONFIG_SMP
	off = ALIGN(off, SMP_CACHE_BYTES);
	if (off == 0)
		return SIZE_MAX;
#endif
2598

2599 2600 2601 2602 2603 2604
	if (ctx->flags & IORING_SETUP_NO_SQARRAY) {
		if (sq_offset)
			*sq_offset = SIZE_MAX;
		return off;
	}

2605 2606
	if (sq_offset)
		*sq_offset = off;
2607

2608 2609 2610
	sq_array_size = array_size(sizeof(u32), sq_entries);
	if (sq_array_size == SIZE_MAX)
		return SIZE_MAX;
2611

2612 2613
	if (check_add_overflow(off, sq_array_size, &off))
		return SIZE_MAX;
2614

2615
	return off;
2616 2617
}

2618
static void io_req_caches_free(struct io_ring_ctx *ctx)
Jens Axboe's avatar
Jens Axboe committed
2619
{
2620
	struct io_kiocb *req;
2621
	int nr = 0;
2622

2623 2624
	mutex_lock(&ctx->uring_lock);

2625
	while (!io_req_cache_empty(ctx)) {
2626
		req = io_extract_req(ctx);
2627
		kmem_cache_free(req_cachep, req);
2628
		nr++;
2629
	}
2630 2631
	if (nr)
		percpu_ref_put_many(&ctx->refs, nr);
2632 2633 2634
	mutex_unlock(&ctx->uring_lock);
}

2635
static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
Jens Axboe's avatar
Jens Axboe committed
2636
{
2637
	io_sq_thread_finish(ctx);
2638
	/* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
2639 2640
	if (WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list)))
		return;
2641

2642
	mutex_lock(&ctx->uring_lock);
2643
	if (ctx->buf_data)
2644
		__io_sqe_buffers_unregister(ctx);
2645
	if (ctx->file_data)
2646
		__io_sqe_files_unregister(ctx);
2647
	io_cqring_overflow_kill(ctx);
2648
	io_eventfd_unregister(ctx);
2649
	io_alloc_cache_free(&ctx->apoll_cache, kfree);
Jens Axboe's avatar
Jens Axboe committed
2650
	io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
2651
	io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
2652
	io_alloc_cache_free(&ctx->uring_cache, kfree);
2653
	io_futex_cache_free(ctx);
2654
	io_destroy_buffers(ctx);
2655
	mutex_unlock(&ctx->uring_lock);
2656 2657
	if (ctx->sq_creds)
		put_cred(ctx->sq_creds);
2658 2659
	if (ctx->submitter_task)
		put_task_struct(ctx->submitter_task);
2660

2661 2662
	/* there are no registered resources left, nobody uses it */
	if (ctx->rsrc_node)
2663
		io_rsrc_node_destroy(ctx, ctx->rsrc_node);
2664 2665

	WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
2666
	WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
Jens Axboe's avatar
Jens Axboe committed
2667

2668
	io_alloc_cache_free(&ctx->rsrc_node_cache, kfree);
2669 2670 2671 2672
	if (ctx->mm_account) {
		mmdrop(ctx->mm_account);
		ctx->mm_account = NULL;
	}
2673
	io_rings_free(ctx);
Jens Axboe's avatar
Jens Axboe committed
2674 2675 2676

	percpu_ref_exit(&ctx->refs);
	free_uid(ctx->user);
2677
	io_req_caches_free(ctx);
2678 2679
	if (ctx->hash_map)
		io_wq_put_hash(ctx->hash_map);
2680
	io_napi_free(ctx);
2681
	kfree(ctx->cancel_table.hbs);
2682
	kfree(ctx->cancel_table_locked.hbs);
2683
	xa_destroy(&ctx->io_bl_xa);
Jens Axboe's avatar
Jens Axboe committed
2684 2685 2686
	kfree(ctx);
}

2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703
static __cold void io_activate_pollwq_cb(struct callback_head *cb)
{
	struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
					       poll_wq_task_work);

	mutex_lock(&ctx->uring_lock);
	ctx->poll_activated = true;
	mutex_unlock(&ctx->uring_lock);

	/*
	 * Wake ups for some events between start of polling and activation
	 * might've been lost due to loose synchronisation.
	 */
	wake_up_all(&ctx->poll_wq);
	percpu_ref_put(&ctx->refs);
}

2704
__cold void io_activate_pollwq(struct io_ring_ctx *ctx)
2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725
{
	spin_lock(&ctx->completion_lock);
	/* already activated or in progress */
	if (ctx->poll_activated || ctx->poll_wq_task_work.func)
		goto out;
	if (WARN_ON_ONCE(!ctx->task_complete))
		goto out;
	if (!ctx->submitter_task)
		goto out;
	/*
	 * with ->submitter_task only the submitter task completes requests, we
	 * only need to sync with it, which is done by injecting a tw
	 */
	init_task_work(&ctx->poll_wq_task_work, io_activate_pollwq_cb);
	percpu_ref_get(&ctx->refs);
	if (task_work_add(ctx->submitter_task, &ctx->poll_wq_task_work, TWA_SIGNAL))
		percpu_ref_put(&ctx->refs);
out:
	spin_unlock(&ctx->completion_lock);
}

Jens Axboe's avatar
Jens Axboe committed
2726 2727 2728 2729 2730
static __poll_t io_uring_poll(struct file *file, poll_table *wait)
{
	struct io_ring_ctx *ctx = file->private_data;
	__poll_t mask = 0;

2731 2732 2733
	if (unlikely(!ctx->poll_activated))
		io_activate_pollwq(ctx);

2734
	poll_wait(file, &ctx->poll_wq, wait);
2735 2736 2737 2738
	/*
	 * synchronizes with barrier from wq_has_sleeper call in
	 * io_commit_cqring
	 */
Jens Axboe's avatar
Jens Axboe committed
2739
	smp_rmb();
2740
	if (!io_sqring_full(ctx))
Jens Axboe's avatar
Jens Axboe committed
2741
		mask |= EPOLLOUT | EPOLLWRNORM;
2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753

	/*
	 * Don't flush cqring overflow list here, just do a simple check.
	 * Otherwise there could possible be ABBA deadlock:
	 *      CPU0                    CPU1
	 *      ----                    ----
	 * lock(&ctx->uring_lock);
	 *                              lock(&ep->mtx);
	 *                              lock(&ctx->uring_lock);
	 * lock(&ep->mtx);
	 *
	 * Users may get EPOLLIN meanwhile seeing nothing in cqring, this
Dylan Yudaken's avatar
Dylan Yudaken committed
2754
	 * pushes them to do the flush.
2755
	 */
2756

2757
	if (__io_cqring_events_user(ctx) || io_has_work(ctx))
Jens Axboe's avatar
Jens Axboe committed
2758 2759 2760 2761 2762
		mask |= EPOLLIN | EPOLLRDNORM;

	return mask;
}

2763 2764 2765
struct io_tctx_exit {
	struct callback_head		task_work;
	struct completion		completion;
2766
	struct io_ring_ctx		*ctx;
2767 2768
};

2769
static __cold void io_tctx_exit_cb(struct callback_head *cb)
2770 2771 2772 2773 2774 2775
{
	struct io_uring_task *tctx = current->io_uring;
	struct io_tctx_exit *work;

	work = container_of(cb, struct io_tctx_exit, task_work);
	/*
2776
	 * When @in_cancel, we're in cancellation and it's racy to remove the
2777
	 * node. It'll be removed by the end of cancellation, just ignore it.
2778 2779
	 * tctx can be NULL if the queueing of this task_work raced with
	 * work cancelation off the exec path.
2780
	 */
2781
	if (tctx && !atomic_read(&tctx->in_cancel))
2782
		io_uring_del_tctx_node((unsigned long)work->ctx);
2783 2784 2785
	complete(&work->completion);
}

2786
static __cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
2787 2788 2789 2790 2791 2792
{
	struct io_kiocb *req = container_of(work, struct io_kiocb, work);

	return req->ctx == data;
}

2793
static __cold void io_ring_exit_work(struct work_struct *work)
2794
{
2795
	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
2796
	unsigned long timeout = jiffies + HZ * 60 * 5;
2797
	unsigned long interval = HZ / 20;
2798 2799 2800
	struct io_tctx_exit exit;
	struct io_tctx_node *node;
	int ret;
2801

2802 2803 2804 2805 2806 2807
	/*
	 * If we're doing polled IO and end up having requests being
	 * submitted async (out-of-line), then completions can come in while
	 * we're waiting for refs to drop. We need to reap these manually,
	 * as nobody else will be looking for them.
	 */
2808
	do {
2809 2810 2811 2812 2813 2814
		if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
			mutex_lock(&ctx->uring_lock);
			io_cqring_overflow_kill(ctx);
			mutex_unlock(&ctx->uring_lock);
		}

2815 2816 2817
		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
			io_move_task_work_from_local(ctx);

2818 2819 2820
		while (io_uring_try_cancel_requests(ctx, NULL, true))
			cond_resched();

2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831
		if (ctx->sq_data) {
			struct io_sq_data *sqd = ctx->sq_data;
			struct task_struct *tsk;

			io_sq_thread_park(sqd);
			tsk = sqd->thread;
			if (tsk && tsk->io_uring && tsk->io_uring->io_wq)
				io_wq_cancel_cb(tsk->io_uring->io_wq,
						io_cancel_ctx_cb, ctx, true);
			io_sq_thread_unpark(sqd);
		}
2832

2833 2834
		io_req_caches_free(ctx);

2835 2836 2837 2838
		if (WARN_ON_ONCE(time_after(jiffies, timeout))) {
			/* there is little hope left, don't run it too often */
			interval = HZ * 60;
		}
2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850
		/*
		 * This is really an uninterruptible wait, as it has to be
		 * complete. But it's also run from a kworker, which doesn't
		 * take signals, so it's fine to make it interruptible. This
		 * avoids scenarios where we knowingly can wait much longer
		 * on completions, for example if someone does a SIGSTOP on
		 * a task that needs to finish task_work to make this loop
		 * complete. That's a synthetic situation that should not
		 * cause a stuck task backtrace, and hence a potential panic
		 * on stuck tasks if that is enabled.
		 */
	} while (!wait_for_completion_interruptible_timeout(&ctx->ref_comp, interval));
2851

2852 2853 2854
	init_completion(&exit.completion);
	init_task_work(&exit.task_work, io_tctx_exit_cb);
	exit.ctx = ctx;
2855

2856 2857
	mutex_lock(&ctx->uring_lock);
	while (!list_empty(&ctx->tctx_list)) {
2858 2859
		WARN_ON_ONCE(time_after(jiffies, timeout));

2860 2861
		node = list_first_entry(&ctx->tctx_list, struct io_tctx_node,
					ctx_node);
2862 2863
		/* don't spin on a single task if cancellation failed */
		list_rotate_left(&ctx->tctx_list);
2864 2865 2866 2867 2868
		ret = task_work_add(node->task, &exit.task_work, TWA_SIGNAL);
		if (WARN_ON_ONCE(ret))
			continue;

		mutex_unlock(&ctx->uring_lock);
2869 2870 2871 2872 2873 2874
		/*
		 * See comment above for
		 * wait_for_completion_interruptible_timeout() on why this
		 * wait is marked as interruptible.
		 */
		wait_for_completion_interruptible(&exit.completion);
2875 2876 2877
		mutex_lock(&ctx->uring_lock);
	}
	mutex_unlock(&ctx->uring_lock);
2878 2879
	spin_lock(&ctx->completion_lock);
	spin_unlock(&ctx->completion_lock);
2880

2881 2882 2883 2884
	/* pairs with RCU read section in io_req_local_work_add() */
	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
		synchronize_rcu();

2885 2886 2887
	io_ring_ctx_free(ctx);
}

2888
static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
Jens Axboe's avatar
Jens Axboe committed
2889
{
2890 2891 2892
	unsigned long index;
	struct creds *creds;

Jens Axboe's avatar
Jens Axboe committed
2893 2894
	mutex_lock(&ctx->uring_lock);
	percpu_ref_kill(&ctx->refs);
2895 2896
	xa_for_each(&ctx->personalities, index, creds)
		io_unregister_personality(ctx, index);
Jens Axboe's avatar
Jens Axboe committed
2897 2898
	mutex_unlock(&ctx->uring_lock);

2899 2900
	flush_delayed_work(&ctx->fallback_work);

2901
	INIT_WORK(&ctx->exit_work, io_ring_exit_work);
2902 2903 2904 2905 2906 2907
	/*
	 * Use system_unbound_wq to avoid spawning tons of event kworkers
	 * if we're exiting a ton of rings at the same time. It just adds
	 * noise and overhead, there's no discernable change in runtime
	 * over using system_wq.
	 */
2908
	queue_work(iou_wq, &ctx->exit_work);
Jens Axboe's avatar
Jens Axboe committed
2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919
}

static int io_uring_release(struct inode *inode, struct file *file)
{
	struct io_ring_ctx *ctx = file->private_data;

	file->private_data = NULL;
	io_ring_ctx_wait_and_kill(ctx);
	return 0;
}

2920 2921
struct io_task_cancel {
	struct task_struct *task;
2922
	bool all;
2923
};
2924

2925
static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
2926
{
2927
	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2928
	struct io_task_cancel *cancel = data;
2929

2930
	return io_match_task_safe(req, cancel->task, cancel->all);
2931 2932
}

2933 2934 2935
static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
					 struct task_struct *task,
					 bool cancel_all)
2936
{
2937
	struct io_defer_entry *de;
2938 2939
	LIST_HEAD(list);

2940
	spin_lock(&ctx->completion_lock);
2941
	list_for_each_entry_reverse(de, &ctx->defer_list, list) {
2942
		if (io_match_task_safe(de->req, task, cancel_all)) {
2943 2944 2945 2946
			list_cut_position(&list, &ctx->defer_list, &de->list);
			break;
		}
	}
2947
	spin_unlock(&ctx->completion_lock);
2948 2949
	if (list_empty(&list))
		return false;
2950 2951 2952 2953

	while (!list_empty(&list)) {
		de = list_first_entry(&list, struct io_defer_entry, list);
		list_del_init(&de->list);
2954
		io_req_task_queue_fail(de->req, -ECANCELED);
2955 2956
		kfree(de);
	}
2957
	return true;
2958 2959
}

2960
static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983
{
	struct io_tctx_node *node;
	enum io_wq_cancel cret;
	bool ret = false;

	mutex_lock(&ctx->uring_lock);
	list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
		struct io_uring_task *tctx = node->task->io_uring;

		/*
		 * io_wq will stay alive while we hold uring_lock, because it's
		 * killed after ctx nodes, which requires to take the lock.
		 */
		if (!tctx || !tctx->io_wq)
			continue;
		cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
		ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
	}
	mutex_unlock(&ctx->uring_lock);

	return ret;
}

2984
static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
2985 2986
						struct task_struct *task,
						bool cancel_all)
2987
{
2988
	struct io_task_cancel cancel = { .task = task, .all = cancel_all, };
2989
	struct io_uring_task *tctx = task ? task->io_uring : NULL;
2990 2991
	enum io_wq_cancel cret;
	bool ret = false;
2992

2993 2994 2995 2996 2997 2998
	/* set it so io_req_local_work_add() would wake us up */
	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
		atomic_set(&ctx->cq_wait_nr, 1);
		smp_mb();
	}

2999 3000
	/* failed during ring init, it couldn't have issued any requests */
	if (!ctx->rings)
3001
		return false;
3002

3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013
	if (!task) {
		ret |= io_uring_try_cancel_iowq(ctx);
	} else if (tctx && tctx->io_wq) {
		/*
		 * Cancels requests of all rings, not only @ctx, but
		 * it's fine as the task is in exit/exec.
		 */
		cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
				       &cancel, true);
		ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
	}
3014

3015 3016 3017 3018 3019 3020
	/* SQPOLL thread does its own polling */
	if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
	    (ctx->sq_data && ctx->sq_data->thread == current)) {
		while (!wq_list_empty(&ctx->iopoll_list)) {
			io_iopoll_try_reap_events(ctx);
			ret = true;
3021
			cond_resched();
3022 3023
		}
	}
3024

3025 3026
	if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
	    io_allowed_defer_tw_run(ctx))
3027
		ret |= io_run_local_work(ctx, INT_MAX) > 0;
3028 3029 3030
	ret |= io_cancel_defer_files(ctx, task, cancel_all);
	mutex_lock(&ctx->uring_lock);
	ret |= io_poll_remove_all(ctx, task, cancel_all);
3031
	ret |= io_waitid_remove_all(ctx, task, cancel_all);
3032
	ret |= io_futex_remove_all(ctx, task, cancel_all);
Ming Lei's avatar
Ming Lei committed
3033
	ret |= io_uring_try_cancel_uring_cmd(ctx, task, cancel_all);
3034 3035 3036
	mutex_unlock(&ctx->uring_lock);
	ret |= io_kill_timeouts(ctx, task, cancel_all);
	if (task)
3037
		ret |= io_run_task_work() > 0;
3038 3039
	else
		ret |= flush_delayed_work(&ctx->fallback_work);
3040
	return ret;
3041 3042
}

3043
static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked)
3044
{
3045
	if (tracked)
3046
		return atomic_read(&tctx->inflight_tracked);
3047 3048 3049
	return percpu_counter_sum(&tctx->inflight);
}

3050 3051
/*
 * Find any io_uring ctx that this task has registered or done IO on, and cancel
3052
 * requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation.
3053
 */
3054
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
3055
{
3056
	struct io_uring_task *tctx = current->io_uring;
3057
	struct io_ring_ctx *ctx;
3058 3059
	struct io_tctx_node *node;
	unsigned long index;
3060 3061
	s64 inflight;
	DEFINE_WAIT(wait);
3062

3063 3064
	WARN_ON_ONCE(sqd && sqd->thread != current);

3065 3066
	if (!current->io_uring)
		return;
3067 3068 3069
	if (tctx->io_wq)
		io_wq_exit_start(tctx->io_wq);

3070
	atomic_inc(&tctx->in_cancel);
3071
	do {
3072 3073
		bool loop = false;

3074
		io_uring_drop_tctx_refs(current);
3075
		/* read completions before cancelations */
3076
		inflight = tctx_inflight(tctx, !cancel_all);
3077 3078
		if (!inflight)
			break;
3079

3080 3081 3082 3083 3084
		if (!sqd) {
			xa_for_each(&tctx->xa, index, node) {
				/* sqpoll task will cancel all its requests */
				if (node->ctx->sq_data)
					continue;
3085 3086
				loop |= io_uring_try_cancel_requests(node->ctx,
							current, cancel_all);
3087 3088 3089
			}
		} else {
			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
3090 3091 3092 3093 3094 3095 3096 3097
				loop |= io_uring_try_cancel_requests(ctx,
								     current,
								     cancel_all);
		}

		if (loop) {
			cond_resched();
			continue;
3098
		}
3099

3100 3101
		prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
		io_run_task_work();
3102
		io_uring_drop_tctx_refs(current);
3103 3104 3105 3106 3107 3108 3109
		xa_for_each(&tctx->xa, index, node) {
			if (!llist_empty(&node->ctx->work_llist)) {
				WARN_ON_ONCE(node->ctx->submitter_task &&
					     node->ctx->submitter_task != current);
				goto end_wait;
			}
		}
3110
		/*
3111 3112 3113
		 * If we've seen completions, retry without waiting. This
		 * avoids a race where a completion comes in before we did
		 * prepare_to_wait().
3114
		 */
3115
		if (inflight == tctx_inflight(tctx, !cancel_all))
3116
			schedule();
3117
end_wait:
3118
		finish_wait(&tctx->wait, &wait);
3119
	} while (1);
3120

3121
	io_uring_clean_tctx(tctx);
3122
	if (cancel_all) {
3123 3124
		/*
		 * We shouldn't run task_works after cancel, so just leave
3125
		 * ->in_cancel set for normal exit.
3126
		 */
3127
		atomic_dec(&tctx->in_cancel);
3128 3129 3130
		/* for exec all current's requests should be gone, kill tctx */
		__io_uring_free(current);
	}
3131 3132
}

3133
void __io_uring_cancel(bool cancel_all)
3134
{
3135
	io_uring_cancel_generic(cancel_all, NULL);
3136 3137
}

3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150
static int io_validate_ext_arg(unsigned flags, const void __user *argp, size_t argsz)
{
	if (flags & IORING_ENTER_EXT_ARG) {
		struct io_uring_getevents_arg arg;

		if (argsz != sizeof(arg))
			return -EINVAL;
		if (copy_from_user(&arg, argp, sizeof(arg)))
			return -EFAULT;
	}
	return 0;
}

3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174
static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz,
			  struct __kernel_timespec __user **ts,
			  const sigset_t __user **sig)
{
	struct io_uring_getevents_arg arg;

	/*
	 * If EXT_ARG isn't set, then we have no timespec and the argp pointer
	 * is just a pointer to the sigset_t.
	 */
	if (!(flags & IORING_ENTER_EXT_ARG)) {
		*sig = (const sigset_t __user *) argp;
		*ts = NULL;
		return 0;
	}

	/*
	 * EXT_ARG is set - ensure we agree on the size of it and copy in our
	 * timespec and sigset_t pointers if good.
	 */
	if (*argsz != sizeof(arg))
		return -EINVAL;
	if (copy_from_user(&arg, argp, sizeof(arg)))
		return -EFAULT;
3175 3176
	if (arg.pad)
		return -EINVAL;
3177 3178 3179 3180 3181 3182
	*sig = u64_to_user_ptr(arg.sigmask);
	*argsz = arg.sigmask_sz;
	*ts = u64_to_user_ptr(arg.ts);
	return 0;
}

Jens Axboe's avatar
Jens Axboe committed
3183
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
3184 3185
		u32, min_complete, u32, flags, const void __user *, argp,
		size_t, argsz)
Jens Axboe's avatar
Jens Axboe committed
3186 3187
{
	struct io_ring_ctx *ctx;
3188
	struct file *file;
3189
	long ret;
Jens Axboe's avatar
Jens Axboe committed
3190

3191
	if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
3192 3193
			       IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
			       IORING_ENTER_REGISTERED_RING)))
Jens Axboe's avatar
Jens Axboe committed
3194 3195
		return -EINVAL;

3196 3197 3198 3199 3200 3201 3202
	/*
	 * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
	 * need only dereference our task private array to find it.
	 */
	if (flags & IORING_ENTER_REGISTERED_RING) {
		struct io_uring_task *tctx = current->io_uring;

3203
		if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
3204 3205
			return -EINVAL;
		fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
3206 3207
		file = tctx->registered_rings[fd];
		if (unlikely(!file))
3208
			return -EBADF;
3209
	} else {
3210 3211
		file = fget(fd);
		if (unlikely(!file))
3212 3213
			return -EBADF;
		ret = -EOPNOTSUPP;
3214
		if (unlikely(!io_is_uring_fops(file)))
3215
			goto out;
3216
	}
Jens Axboe's avatar
Jens Axboe committed
3217

3218
	ctx = file->private_data;
3219
	ret = -EBADFD;
3220
	if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
3221 3222
		goto out;

3223 3224 3225 3226 3227
	/*
	 * For SQ polling, the thread will do all submissions and completions.
	 * Just return the requested submit count, and wake the thread if
	 * we were asked to.
	 */
3228
	ret = 0;
3229
	if (ctx->flags & IORING_SETUP_SQPOLL) {
3230 3231
		if (unlikely(ctx->sq_data->thread == NULL)) {
			ret = -EOWNERDEAD;
3232
			goto out;
3233
		}
3234
		if (flags & IORING_ENTER_SQ_WAKEUP)
3235
			wake_up(&ctx->sq_data->wait);
3236 3237 3238
		if (flags & IORING_ENTER_SQ_WAIT)
			io_sqpoll_wait_sq(ctx);

3239
		ret = to_submit;
3240
	} else if (to_submit) {
3241
		ret = io_uring_add_tctx_node(ctx);
3242 3243
		if (unlikely(ret))
			goto out;
3244

Jens Axboe's avatar
Jens Axboe committed
3245
		mutex_lock(&ctx->uring_lock);
3246 3247
		ret = io_submit_sqes(ctx, to_submit);
		if (ret != to_submit) {
3248
			mutex_unlock(&ctx->uring_lock);
3249
			goto out;
3250
		}
3251 3252 3253 3254 3255 3256 3257 3258
		if (flags & IORING_ENTER_GETEVENTS) {
			if (ctx->syscall_iopoll)
				goto iopoll_locked;
			/*
			 * Ignore errors, we'll soon call io_cqring_wait() and
			 * it should handle ownership problems if any.
			 */
			if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
3259
				(void)io_run_local_work_locked(ctx, min_complete);
3260
		}
3261
		mutex_unlock(&ctx->uring_lock);
Jens Axboe's avatar
Jens Axboe committed
3262
	}
3263

Jens Axboe's avatar
Jens Axboe committed
3264
	if (flags & IORING_ENTER_GETEVENTS) {
3265
		int ret2;
3266

3267
		if (ctx->syscall_iopoll) {
3268 3269 3270 3271 3272 3273 3274 3275
			/*
			 * We disallow the app entering submit/complete with
			 * polling, but we still need to lock the ring to
			 * prevent racing with polled issue that got punted to
			 * a workqueue.
			 */
			mutex_lock(&ctx->uring_lock);
iopoll_locked:
3276 3277 3278 3279 3280
			ret2 = io_validate_ext_arg(flags, argp, argsz);
			if (likely(!ret2)) {
				min_complete = min(min_complete,
						   ctx->cq_entries);
				ret2 = io_iopoll_check(ctx, min_complete);
3281 3282
			}
			mutex_unlock(&ctx->uring_lock);
3283
		} else {
3284 3285 3286
			const sigset_t __user *sig;
			struct __kernel_timespec __user *ts;

3287 3288 3289 3290 3291 3292 3293
			ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
			if (likely(!ret2)) {
				min_complete = min(min_complete,
						   ctx->cq_entries);
				ret2 = io_cqring_wait(ctx, min_complete, sig,
						      argsz, ts);
			}
3294
		}
3295

3296
		if (!ret) {
3297
			ret = ret2;
Jens Axboe's avatar
Jens Axboe committed
3298

3299 3300 3301 3302 3303 3304 3305 3306
			/*
			 * EBADR indicates that one or more CQE were dropped.
			 * Once the user has been informed we can clear the bit
			 * as they are obviously ok with those drops.
			 */
			if (unlikely(ret2 == -EBADR))
				clear_bit(IO_CHECK_CQ_DROPPED_BIT,
					  &ctx->check_cq);
3307
		}
Jens Axboe's avatar
Jens Axboe committed
3308
	}
3309
out:
3310 3311
	if (!(flags & IORING_ENTER_REGISTERED_RING))
		fput(file);
3312
	return ret;
Jens Axboe's avatar
Jens Axboe committed
3313 3314 3315 3316 3317
}

static const struct file_operations io_uring_fops = {
	.release	= io_uring_release,
	.mmap		= io_uring_mmap,
3318
	.get_unmapped_area = io_uring_get_unmapped_area,
3319 3320 3321
#ifndef CONFIG_MMU
	.mmap_capabilities = io_uring_nommu_mmap_capabilities,
#endif
Jens Axboe's avatar
Jens Axboe committed
3322
	.poll		= io_uring_poll,
3323
#ifdef CONFIG_PROC_FS
3324
	.show_fdinfo	= io_uring_show_fdinfo,
3325
#endif
Jens Axboe's avatar
Jens Axboe committed
3326 3327
};

3328 3329 3330 3331 3332
bool io_is_uring_fops(struct file *file)
{
	return file->f_op == &io_uring_fops;
}

3333 3334
static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
					 struct io_uring_params *p)
Jens Axboe's avatar
Jens Axboe committed
3335
{
3336 3337
	struct io_rings *rings;
	size_t size, sq_array_offset;
3338
	void *ptr;
Jens Axboe's avatar
Jens Axboe committed
3339

3340 3341 3342 3343
	/* make sure these are sane, as we already accounted them */
	ctx->sq_entries = p->sq_entries;
	ctx->cq_entries = p->cq_entries;

3344
	size = rings_size(ctx, p->sq_entries, p->cq_entries, &sq_array_offset);
3345 3346 3347
	if (size == SIZE_MAX)
		return -EOVERFLOW;

3348
	if (!(ctx->flags & IORING_SETUP_NO_MMAP))
3349
		rings = io_pages_map(&ctx->ring_pages, &ctx->n_ring_pages, size);
3350 3351 3352
	else
		rings = io_rings_map(ctx, p->cq_off.user_addr, size);

3353 3354
	if (IS_ERR(rings))
		return PTR_ERR(rings);
Jens Axboe's avatar
Jens Axboe committed
3355

3356
	ctx->rings = rings;
3357 3358
	if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
		ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
3359 3360 3361 3362
	rings->sq_ring_mask = p->sq_entries - 1;
	rings->cq_ring_mask = p->cq_entries - 1;
	rings->sq_ring_entries = p->sq_entries;
	rings->cq_ring_entries = p->cq_entries;
Jens Axboe's avatar
Jens Axboe committed
3363

3364 3365 3366 3367
	if (p->flags & IORING_SETUP_SQE128)
		size = array_size(2 * sizeof(struct io_uring_sqe), p->sq_entries);
	else
		size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
3368
	if (size == SIZE_MAX) {
3369
		io_rings_free(ctx);
Jens Axboe's avatar
Jens Axboe committed
3370
		return -EOVERFLOW;
3371
	}
Jens Axboe's avatar
Jens Axboe committed
3372

3373
	if (!(ctx->flags & IORING_SETUP_NO_MMAP))
3374
		ptr = io_pages_map(&ctx->sqe_pages, &ctx->n_sqe_pages, size);
3375 3376 3377
	else
		ptr = io_sqes_map(ctx, p->sq_off.user_addr, size);

3378
	if (IS_ERR(ptr)) {
3379
		io_rings_free(ctx);
3380
		return PTR_ERR(ptr);
3381
	}
Jens Axboe's avatar
Jens Axboe committed
3382

3383
	ctx->sq_sqes = ptr;
Jens Axboe's avatar
Jens Axboe committed
3384 3385 3386
	return 0;
}

3387
static int io_uring_install_fd(struct file *file)
3388
{
3389
	int fd;
3390 3391 3392 3393 3394 3395 3396 3397

	fd = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
	if (fd < 0)
		return fd;
	fd_install(fd, file);
	return fd;
}

Jens Axboe's avatar
Jens Axboe committed
3398 3399 3400
/*
 * Allocate an anonymous fd, this is what constitutes the application
 * visible backing of an io_uring instance. The application mmaps this
3401
 * fd to gain access to the SQ/CQ ring details.
Jens Axboe's avatar
Jens Axboe committed
3402
 */
3403
static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
Jens Axboe's avatar
Jens Axboe committed
3404
{
3405
	/* Create a new inode so that the LSM can block the creation.  */
3406
	return anon_inode_create_getfile("[io_uring]", &io_uring_fops, ctx,
3407
					 O_RDWR | O_CLOEXEC, NULL);
Jens Axboe's avatar
Jens Axboe committed
3408 3409
}

3410 3411
static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
				  struct io_uring_params __user *params)
Jens Axboe's avatar
Jens Axboe committed
3412 3413
{
	struct io_ring_ctx *ctx;
3414
	struct io_uring_task *tctx;
3415
	struct file *file;
Jens Axboe's avatar
Jens Axboe committed
3416 3417
	int ret;

3418
	if (!entries)
Jens Axboe's avatar
Jens Axboe committed
3419
		return -EINVAL;
3420 3421 3422 3423 3424
	if (entries > IORING_MAX_ENTRIES) {
		if (!(p->flags & IORING_SETUP_CLAMP))
			return -EINVAL;
		entries = IORING_MAX_ENTRIES;
	}
Jens Axboe's avatar
Jens Axboe committed
3425

3426 3427 3428 3429
	if ((p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
	    && !(p->flags & IORING_SETUP_NO_MMAP))
		return -EINVAL;

Jens Axboe's avatar
Jens Axboe committed
3430 3431 3432 3433
	/*
	 * Use twice as many entries for the CQ ring. It's possible for the
	 * application to drive a higher depth than the size of the SQ ring,
	 * since the sqes are only used at submission time. This allows for
3434 3435 3436
	 * some flexibility in overcommitting a bit. If the application has
	 * set IORING_SETUP_CQSIZE, it will have passed in the desired number
	 * of CQ ring entries manually.
Jens Axboe's avatar
Jens Axboe committed
3437 3438
	 */
	p->sq_entries = roundup_pow_of_two(entries);
3439 3440 3441 3442 3443 3444
	if (p->flags & IORING_SETUP_CQSIZE) {
		/*
		 * If IORING_SETUP_CQSIZE is set, we do the same roundup
		 * to a power-of-two, if it isn't already. We do NOT impose
		 * any cq vs sq ring sizing.
		 */
3445
		if (!p->cq_entries)
3446
			return -EINVAL;
3447 3448 3449 3450 3451
		if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
			if (!(p->flags & IORING_SETUP_CLAMP))
				return -EINVAL;
			p->cq_entries = IORING_MAX_CQ_ENTRIES;
		}
3452 3453 3454
		p->cq_entries = roundup_pow_of_two(p->cq_entries);
		if (p->cq_entries < p->sq_entries)
			return -EINVAL;
3455 3456 3457
	} else {
		p->cq_entries = 2 * p->sq_entries;
	}
Jens Axboe's avatar
Jens Axboe committed
3458 3459

	ctx = io_ring_ctx_alloc(p);
3460
	if (!ctx)
Jens Axboe's avatar
Jens Axboe committed
3461
		return -ENOMEM;
3462

3463 3464 3465 3466 3467
	if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
	    !(ctx->flags & IORING_SETUP_IOPOLL) &&
	    !(ctx->flags & IORING_SETUP_SQPOLL))
		ctx->task_complete = true;

3468 3469 3470
	if (ctx->task_complete || (ctx->flags & IORING_SETUP_IOPOLL))
		ctx->lockless_cq = true;

3471 3472 3473 3474 3475 3476 3477
	/*
	 * lazy poll_wq activation relies on ->task_complete for synchronisation
	 * purposes, see io_activate_pollwq()
	 */
	if (!ctx->task_complete)
		ctx->poll_activated = true;

3478 3479 3480 3481 3482 3483 3484 3485 3486 3487
	/*
	 * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
	 * space applications don't need to do io completion events
	 * polling again, they can rely on io_sq_thread to do polling
	 * work, which can reduce cpu usage and uring_lock contention.
	 */
	if (ctx->flags & IORING_SETUP_IOPOLL &&
	    !(ctx->flags & IORING_SETUP_SQPOLL))
		ctx->syscall_iopoll = 1;

Jens Axboe's avatar
Jens Axboe committed
3488
	ctx->compat = in_compat_syscall();
3489
	if (!ns_capable_noaudit(&init_user_ns, CAP_IPC_LOCK))
3490
		ctx->user = get_uid(current_user());
3491

3492
	/*
3493 3494
	 * For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
	 * COOP_TASKRUN is set, then IPIs are never needed by the app.
3495
	 */
3496 3497 3498
	ret = -EINVAL;
	if (ctx->flags & IORING_SETUP_SQPOLL) {
		/* IPI related flags don't make sense with SQPOLL */
3499
		if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
3500 3501
				  IORING_SETUP_TASKRUN_FLAG |
				  IORING_SETUP_DEFER_TASKRUN))
3502
			goto err;
3503
		ctx->notify_method = TWA_SIGNAL_NO_IPI;
3504 3505 3506
	} else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
		ctx->notify_method = TWA_SIGNAL_NO_IPI;
	} else {
3507 3508
		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG &&
		    !(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
3509
			goto err;
3510
		ctx->notify_method = TWA_SIGNAL;
3511
	}
3512

3513 3514 3515 3516 3517 3518 3519 3520 3521 3522
	/*
	 * For DEFER_TASKRUN we require the completion task to be the same as the
	 * submission task. This implies that there is only one submitter, so enforce
	 * that.
	 */
	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN &&
	    !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) {
		goto err;
	}

3523 3524 3525 3526 3527 3528
	/*
	 * This is just grabbed for accounting purposes. When a process exits,
	 * the mm is exited and dropped before the files, hence we need to hang
	 * on to this mm purely for the purposes of being able to unaccount
	 * memory (locked/pinned vm). It's not used for anything else.
	 */
3529
	mmgrab(current->mm);
3530
	ctx->mm_account = current->mm;
3531

Jens Axboe's avatar
Jens Axboe committed
3532 3533 3534 3535
	ret = io_allocate_scq_urings(ctx, p);
	if (ret)
		goto err;

3536
	ret = io_sq_offload_create(ctx, p);
Jens Axboe's avatar
Jens Axboe committed
3537 3538
	if (ret)
		goto err;
3539 3540

	ret = io_rsrc_init(ctx);
3541 3542
	if (ret)
		goto err;
Jens Axboe's avatar
Jens Axboe committed
3543

3544 3545 3546 3547 3548 3549
	p->sq_off.head = offsetof(struct io_rings, sq.head);
	p->sq_off.tail = offsetof(struct io_rings, sq.tail);
	p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
	p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
	p->sq_off.flags = offsetof(struct io_rings, sq_flags);
	p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
3550 3551
	if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
		p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
3552
	p->sq_off.resv1 = 0;
3553 3554
	if (!(ctx->flags & IORING_SETUP_NO_MMAP))
		p->sq_off.user_addr = 0;
Jens Axboe's avatar
Jens Axboe committed
3555

3556 3557 3558 3559 3560 3561
	p->cq_off.head = offsetof(struct io_rings, cq.head);
	p->cq_off.tail = offsetof(struct io_rings, cq.tail);
	p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
	p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
	p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
	p->cq_off.cqes = offsetof(struct io_rings, cqes);
3562
	p->cq_off.flags = offsetof(struct io_rings, cq_flags);
3563
	p->cq_off.resv1 = 0;
3564 3565
	if (!(ctx->flags & IORING_SETUP_NO_MMAP))
		p->cq_off.user_addr = 0;
3566

3567 3568
	p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
			IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
3569
			IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
3570
			IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
3571
			IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
3572
			IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
3573
			IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING;
3574 3575 3576 3577 3578

	if (copy_to_user(params, p, sizeof(*p))) {
		ret = -EFAULT;
		goto err;
	}
3579

3580 3581
	if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
	    && !(ctx->flags & IORING_SETUP_R_DISABLED))
3582
		WRITE_ONCE(ctx->submitter_task, get_task_struct(current));
3583

3584 3585 3586 3587 3588 3589
	file = io_uring_get_file(ctx);
	if (IS_ERR(file)) {
		ret = PTR_ERR(file);
		goto err;
	}

3590 3591 3592 3593 3594
	ret = __io_uring_add_tctx_node(ctx);
	if (ret)
		goto err_fput;
	tctx = current->io_uring;

3595 3596 3597 3598
	/*
	 * Install ring fd as the very last thing, so we don't risk someone
	 * having closed it before we finish setup
	 */
3599 3600 3601 3602 3603 3604
	if (p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
		ret = io_ring_add_registered_file(tctx, file, 0, IO_RINGFD_REG_MAX);
	else
		ret = io_uring_install_fd(file);
	if (ret < 0)
		goto err_fput;
3605

3606
	trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
Jens Axboe's avatar
Jens Axboe committed
3607 3608 3609 3610
	return ret;
err:
	io_ring_ctx_wait_and_kill(ctx);
	return ret;
3611 3612 3613
err_fput:
	fput(file);
	return ret;
Jens Axboe's avatar
Jens Axboe committed
3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632
}

/*
 * Sets up an aio uring context, and returns the fd. Applications asks for a
 * ring size, we return the actual sq/cq ring sizes (among other things) in the
 * params structure passed in.
 */
static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
{
	struct io_uring_params p;
	int i;

	if (copy_from_user(&p, params, sizeof(p)))
		return -EFAULT;
	for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
		if (p.resv[i])
			return -EINVAL;
	}

3633
	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3634
			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
3635
			IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
3636
			IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
3637
			IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
3638
			IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
3639
			IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN |
3640 3641
			IORING_SETUP_NO_MMAP | IORING_SETUP_REGISTERED_FD_ONLY |
			IORING_SETUP_NO_SQARRAY))
Jens Axboe's avatar
Jens Axboe committed
3642 3643
		return -EINVAL;

3644
	return io_uring_create(entries, &p, params);
Jens Axboe's avatar
Jens Axboe committed
3645 3646
}

3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664
static inline bool io_uring_allowed(void)
{
	int disabled = READ_ONCE(sysctl_io_uring_disabled);
	kgid_t io_uring_group;

	if (disabled == 2)
		return false;

	if (disabled == 0 || capable(CAP_SYS_ADMIN))
		return true;

	io_uring_group = make_kgid(&init_user_ns, sysctl_io_uring_group);
	if (!gid_valid(io_uring_group))
		return false;

	return in_group_p(io_uring_group);
}

Jens Axboe's avatar
Jens Axboe committed
3665 3666 3667
SYSCALL_DEFINE2(io_uring_setup, u32, entries,
		struct io_uring_params __user *, params)
{
3668 3669 3670
	if (!io_uring_allowed())
		return -EPERM;

Jens Axboe's avatar
Jens Axboe committed
3671 3672 3673 3674 3675
	return io_uring_setup(entries, params);
}

static int __init io_uring_init(void)
{
3676
#define __BUILD_BUG_VERIFY_OFFSET_SIZE(stype, eoffset, esize, ename) do { \
3677
	BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
3678
	BUILD_BUG_ON(sizeof_field(stype, ename) != esize); \
3679 3680 3681
} while (0)

#define BUILD_BUG_SQE_ELEM(eoffset, etype, ename) \
3682 3683 3684
	__BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, sizeof(etype), ename)
#define BUILD_BUG_SQE_ELEM_SIZE(eoffset, esize, ename) \
	__BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, esize, ename)
3685 3686 3687 3688 3689 3690 3691
	BUILD_BUG_ON(sizeof(struct io_uring_sqe) != 64);
	BUILD_BUG_SQE_ELEM(0,  __u8,   opcode);
	BUILD_BUG_SQE_ELEM(1,  __u8,   flags);
	BUILD_BUG_SQE_ELEM(2,  __u16,  ioprio);
	BUILD_BUG_SQE_ELEM(4,  __s32,  fd);
	BUILD_BUG_SQE_ELEM(8,  __u64,  off);
	BUILD_BUG_SQE_ELEM(8,  __u64,  addr2);
3692 3693
	BUILD_BUG_SQE_ELEM(8,  __u32,  cmd_op);
	BUILD_BUG_SQE_ELEM(12, __u32, __pad1);
3694
	BUILD_BUG_SQE_ELEM(16, __u64,  addr);
3695
	BUILD_BUG_SQE_ELEM(16, __u64,  splice_off_in);
3696 3697 3698 3699 3700
	BUILD_BUG_SQE_ELEM(24, __u32,  len);
	BUILD_BUG_SQE_ELEM(28,     __kernel_rwf_t, rw_flags);
	BUILD_BUG_SQE_ELEM(28, /* compat */   int, rw_flags);
	BUILD_BUG_SQE_ELEM(28, /* compat */ __u32, rw_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  fsync_flags);
3701 3702
	BUILD_BUG_SQE_ELEM(28, /* compat */ __u16,  poll_events);
	BUILD_BUG_SQE_ELEM(28, __u32,  poll32_events);
3703 3704 3705 3706 3707 3708 3709 3710
	BUILD_BUG_SQE_ELEM(28, __u32,  sync_range_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  msg_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  timeout_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  accept_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  cancel_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  open_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  statx_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  fadvise_advice);
3711
	BUILD_BUG_SQE_ELEM(28, __u32,  splice_flags);
3712 3713 3714 3715 3716
	BUILD_BUG_SQE_ELEM(28, __u32,  rename_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  unlink_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  hardlink_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  xattr_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  msg_ring_flags);
3717 3718
	BUILD_BUG_SQE_ELEM(32, __u64,  user_data);
	BUILD_BUG_SQE_ELEM(40, __u16,  buf_index);
3719
	BUILD_BUG_SQE_ELEM(40, __u16,  buf_group);
3720
	BUILD_BUG_SQE_ELEM(42, __u16,  personality);
3721
	BUILD_BUG_SQE_ELEM(44, __s32,  splice_fd_in);
3722
	BUILD_BUG_SQE_ELEM(44, __u32,  file_index);
3723 3724
	BUILD_BUG_SQE_ELEM(44, __u16,  addr_len);
	BUILD_BUG_SQE_ELEM(46, __u16,  __pad3[0]);
3725
	BUILD_BUG_SQE_ELEM(48, __u64,  addr3);
3726 3727
	BUILD_BUG_SQE_ELEM_SIZE(48, 0, cmd);
	BUILD_BUG_SQE_ELEM(56, __u64,  __pad2);
3728

3729 3730 3731 3732
	BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
		     sizeof(struct io_uring_rsrc_update));
	BUILD_BUG_ON(sizeof(struct io_uring_rsrc_update) >
		     sizeof(struct io_uring_rsrc_update2));
3733 3734

	/* ->buf_index is u16 */
3735 3736 3737
	BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 0);
	BUILD_BUG_ON(offsetof(struct io_uring_buf, resv) !=
		     offsetof(struct io_uring_buf_ring, tail));
3738

3739 3740
	/* should fit into one byte */
	BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
3741 3742
	BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
	BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
3743

3744
	BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof_field(struct io_kiocb, flags));
3745

3746 3747
	BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));

3748 3749 3750
	/* top 8bits are for internal use */
	BUILD_BUG_ON((IORING_URING_CMD_MASK & 0xff000000) != 0);

3751
	io_uring_optable_init();
3752

3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765
	/*
	 * Allow user copy in the per-command field, which starts after the
	 * file in io_kiocb and until the opcode field. The openat2 handling
	 * requires copying in user memory into the io_kiocb object in that
	 * range, and HARDENED_USERCOPY will complain if we haven't
	 * correctly annotated this range.
	 */
	req_cachep = kmem_cache_create_usercopy("io_kiocb",
				sizeof(struct io_kiocb), 0,
				SLAB_HWCACHE_ALIGN | SLAB_PANIC |
				SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU,
				offsetof(struct io_kiocb, cmd.data),
				sizeof_field(struct io_kiocb, cmd.data), NULL);
3766 3767
	io_buf_cachep = KMEM_CACHE(io_buffer,
					  SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT);
3768

3769 3770
	iou_wq = alloc_workqueue("iou_exit", WQ_UNBOUND, 64);

3771 3772 3773 3774
#ifdef CONFIG_SYSCTL
	register_sysctl_init("kernel", kernel_io_uring_disabled_table);
#endif

Jens Axboe's avatar
Jens Axboe committed
3775 3776 3777
	return 0;
};
__initcall(io_uring_init);