Commit e7a6c00d authored by Jens Axboe's avatar Jens Axboe

io_uring: add support for registering ring file descriptors

Lots of workloads use multiple threads, in which case the file table is
shared between them. This makes getting and putting the ring file
descriptor for each io_uring_enter(2) system call more expensive, as it
involves an atomic get and put for each call.

Similarly to how we allow registering normal file descriptors to avoid
this overhead, add support for an io_uring_register(2) API that allows
to register the ring fds themselves:

1) IORING_REGISTER_RING_FDS - takes an array of io_uring_rsrc_update
   structs, and registers them with the task.
2) IORING_UNREGISTER_RING_FDS - takes an array of io_uring_src_update
   structs, and unregisters them.

When a ring fd is registered, it is internally represented by an offset.
This offset is returned to the application, and the application then
uses this offset and sets IORING_ENTER_REGISTERED_RING for the
io_uring_enter(2) system call. This works just like using a registered
file descriptor, rather than a real one, in an SQE, where
IOSQE_FIXED_FILE gets set to tell io_uring that we're using an internal
offset/descriptor rather than a real file descriptor.

In initial testing, this provides a nice bump in performance for
threaded applications in real world cases where the batch count (eg
number of requests submitted per io_uring_enter(2) invocation) is low.
In a microbenchmark, submitting NOP requests, we see the following
increases in performance:

Requests per syscall	Baseline	Registered	Increase
----------------------------------------------------------------
1			 ~7030K		 ~8080K		+15%
2			~13120K		~14800K		+13%
4			~22740K		~25300K		+11%
Co-developed-by: default avatarXiaoguang Wang <xiaoguang.wang@linux.alibaba.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 63c36549
......@@ -466,6 +466,11 @@ struct io_ring_ctx {
};
};
/*
* Arbitrary limit, can be raised if need be
*/
#define IO_RINGFD_REG_MAX 16
struct io_uring_task {
/* submission side */
int cached_refs;
......@@ -481,6 +486,7 @@ struct io_uring_task {
struct io_wq_work_list task_list;
struct io_wq_work_list prior_task_list;
struct callback_head task_work;
struct file **registered_rings;
bool task_running;
};
......@@ -8788,8 +8794,16 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task,
if (unlikely(!tctx))
return -ENOMEM;
tctx->registered_rings = kcalloc(IO_RINGFD_REG_MAX,
sizeof(struct file *), GFP_KERNEL);
if (unlikely(!tctx->registered_rings)) {
kfree(tctx);
return -ENOMEM;
}
ret = percpu_counter_init(&tctx->inflight, 0, GFP_KERNEL);
if (unlikely(ret)) {
kfree(tctx->registered_rings);
kfree(tctx);
return ret;
}
......@@ -8798,6 +8812,7 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task,
if (IS_ERR(tctx->io_wq)) {
ret = PTR_ERR(tctx->io_wq);
percpu_counter_destroy(&tctx->inflight);
kfree(tctx->registered_rings);
kfree(tctx);
return ret;
}
......@@ -8822,6 +8837,7 @@ void __io_uring_free(struct task_struct *tsk)
WARN_ON_ONCE(tctx->io_wq);
WARN_ON_ONCE(tctx->cached_refs);
kfree(tctx->registered_rings);
percpu_counter_destroy(&tctx->inflight);
kfree(tctx);
tsk->io_uring = NULL;
......@@ -10043,6 +10059,139 @@ void __io_uring_cancel(bool cancel_all)
io_uring_cancel_generic(cancel_all, NULL);
}
void io_uring_unreg_ringfd(void)
{
struct io_uring_task *tctx = current->io_uring;
int i;
for (i = 0; i < IO_RINGFD_REG_MAX; i++) {
if (tctx->registered_rings[i]) {
fput(tctx->registered_rings[i]);
tctx->registered_rings[i] = NULL;
}
}
}
static int io_ring_add_registered_fd(struct io_uring_task *tctx, int fd,
int start, int end)
{
struct file *file;
int offset;
for (offset = start; offset < end; offset++) {
offset = array_index_nospec(offset, IO_RINGFD_REG_MAX);
if (tctx->registered_rings[offset])
continue;
file = fget(fd);
if (!file) {
return -EBADF;
} else if (file->f_op != &io_uring_fops) {
fput(file);
return -EOPNOTSUPP;
}
tctx->registered_rings[offset] = file;
return offset;
}
return -EBUSY;
}
/*
* Register a ring fd to avoid fdget/fdput for each io_uring_enter()
* invocation. User passes in an array of struct io_uring_rsrc_update
* with ->data set to the ring_fd, and ->offset given for the desired
* index. If no index is desired, application may set ->offset == -1U
* and we'll find an available index. Returns number of entries
* successfully processed, or < 0 on error if none were processed.
*/
static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg,
unsigned nr_args)
{
struct io_uring_rsrc_update __user *arg = __arg;
struct io_uring_rsrc_update reg;
struct io_uring_task *tctx;
int ret, i;
if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
return -EINVAL;
mutex_unlock(&ctx->uring_lock);
ret = io_uring_add_tctx_node(ctx);
mutex_lock(&ctx->uring_lock);
if (ret)
return ret;
tctx = current->io_uring;
for (i = 0; i < nr_args; i++) {
int start, end;
if (copy_from_user(&reg, &arg[i], sizeof(reg))) {
ret = -EFAULT;
break;
}
if (reg.offset == -1U) {
start = 0;
end = IO_RINGFD_REG_MAX;
} else {
if (reg.offset >= IO_RINGFD_REG_MAX) {
ret = -EINVAL;
break;
}
start = reg.offset;
end = start + 1;
}
ret = io_ring_add_registered_fd(tctx, reg.data, start, end);
if (ret < 0)
break;
reg.offset = ret;
if (copy_to_user(&arg[i], &reg, sizeof(reg))) {
fput(tctx->registered_rings[reg.offset]);
tctx->registered_rings[reg.offset] = NULL;
ret = -EFAULT;
break;
}
}
return i ? i : ret;
}
static int io_ringfd_unregister(struct io_ring_ctx *ctx, void __user *__arg,
unsigned nr_args)
{
struct io_uring_rsrc_update __user *arg = __arg;
struct io_uring_task *tctx = current->io_uring;
struct io_uring_rsrc_update reg;
int ret = 0, i;
if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
return -EINVAL;
if (!tctx)
return 0;
for (i = 0; i < nr_args; i++) {
if (copy_from_user(&reg, &arg[i], sizeof(reg))) {
ret = -EFAULT;
break;
}
if (reg.offset >= IO_RINGFD_REG_MAX) {
ret = -EINVAL;
break;
}
reg.offset = array_index_nospec(reg.offset, IO_RINGFD_REG_MAX);
if (tctx->registered_rings[reg.offset]) {
fput(tctx->registered_rings[reg.offset]);
tctx->registered_rings[reg.offset] = NULL;
}
}
return i ? i : ret;
}
static void *io_uring_validate_mmap_request(struct file *file,
loff_t pgoff, size_t sz)
{
......@@ -10173,12 +10322,28 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
io_run_task_work();
if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG)))
IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
IORING_ENTER_REGISTERED_RING)))
return -EINVAL;
/*
* Ring fd has been registered via IORING_REGISTER_RING_FDS, we
* need only dereference our task private array to find it.
*/
if (flags & IORING_ENTER_REGISTERED_RING) {
struct io_uring_task *tctx = current->io_uring;
if (!tctx || fd >= IO_RINGFD_REG_MAX)
return -EINVAL;
fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
f.file = tctx->registered_rings[fd];
if (unlikely(!f.file))
return -EBADF;
} else {
f = fdget(fd);
if (unlikely(!f.file))
return -EBADF;
}
ret = -EOPNOTSUPP;
if (unlikely(f.file->f_op != &io_uring_fops))
......@@ -10252,6 +10417,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
out:
percpu_ref_put(&ctx->refs);
out_fput:
if (!(flags & IORING_ENTER_REGISTERED_RING))
fdput(f);
return submitted ? submitted : ret;
}
......@@ -11142,6 +11308,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
break;
ret = io_register_iowq_max_workers(ctx, arg);
break;
case IORING_REGISTER_RING_FDS:
ret = io_ringfd_register(ctx, arg, nr_args);
break;
case IORING_UNREGISTER_RING_FDS:
ret = io_ringfd_unregister(ctx, arg, nr_args);
break;
default:
ret = -EINVAL;
break;
......
......@@ -9,11 +9,14 @@
struct sock *io_uring_get_socket(struct file *file);
void __io_uring_cancel(bool cancel_all);
void __io_uring_free(struct task_struct *tsk);
void io_uring_unreg_ringfd(void);
static inline void io_uring_files_cancel(void)
{
if (current->io_uring)
if (current->io_uring) {
io_uring_unreg_ringfd();
__io_uring_cancel(false);
}
}
static inline void io_uring_task_cancel(void)
{
......
......@@ -261,6 +261,7 @@ struct io_cqring_offsets {
#define IORING_ENTER_SQ_WAKEUP (1U << 1)
#define IORING_ENTER_SQ_WAIT (1U << 2)
#define IORING_ENTER_EXT_ARG (1U << 3)
#define IORING_ENTER_REGISTERED_RING (1U << 4)
/*
* Passed in for io_uring_setup(2). Copied back with updated info on success
......@@ -325,6 +326,10 @@ enum {
/* set/get max number of io-wq workers */
IORING_REGISTER_IOWQ_MAX_WORKERS = 19,
/* register/unregister io_uring fd with the ring */
IORING_REGISTER_RING_FDS = 20,
IORING_UNREGISTER_RING_FDS = 21,
/* this goes last */
IORING_REGISTER_LAST
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment