Commit c1b7fcf3 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'for-6.6/io_uring-2023-08-28' of git://git.kernel.dk/linux

Pull io_uring updates from Jens Axboe:
 "Fairly quiet round in terms of features, mostly just improvements all
  over the map for existing code. In detail:

   - Initial support for socket operations through io_uring. Latter half
     of this will likely land with the 6.7 kernel, then allowing things
     like get/setsockopt (Breno)

   - Cleanup of the cancel code, and then adding support for canceling
     requests with the opcode as the key (me)

   - Improvements for the io-wq locking (me)

   - Fix affinity setting for SQPOLL based io-wq (me)

   - Remove the io_uring userspace code. These were added initially as
     copies from liburing, but all of them have since bitrotted and are
     way out of date at this point. Rather than attempt to keep them in
     sync, just get rid of them. People will have liburing available
     anyway for these examples. (Pavel)

   - Series improving the CQ/SQ ring caching (Pavel)

   - Misc fixes and cleanups (Pavel, Yue, me)"

* tag 'for-6.6/io_uring-2023-08-28' of git://git.kernel.dk/linux: (47 commits)
  io_uring: move iopoll ctx fields around
  io_uring: move multishot cqe cache in ctx
  io_uring: separate task_work/waiting cache line
  io_uring: banish non-hot data to end of io_ring_ctx
  io_uring: move non aligned field to the end
  io_uring: add option to remove SQ indirection
  io_uring: compact SQ/CQ heads/tails
  io_uring: force inline io_fill_cqe_req
  io_uring: merge iopoll and normal completion paths
  io_uring: reorder cqring_flush and wakeups
  io_uring: optimise extra io_get_cqe null check
  io_uring: refactor __io_get_cqe()
  io_uring: simplify big_cqe handling
  io_uring: cqe init hardening
  io_uring: improve cqe !tracing hot path
  io_uring/rsrc: Annotate struct io_mapped_ubuf with __counted_by
  io_uring/sqpoll: fix io-wq affinity when IORING_SETUP_SQPOLL is used
  io_uring: simplify io_run_task_work_sig return
  io_uring/rsrc: keep one global dummy_ubuf
  io_uring: never overflow io_aux_cqe
  ...
parents adfd6716 644c4a7a
......@@ -10966,7 +10966,6 @@ F: include/linux/io_uring_types.h
F: include/trace/events/io_uring.h
F: include/uapi/linux/io_uring.h
F: io_uring/
F: tools/io_uring/
IPMI SUBSYSTEM
M: Corey Minyard <minyard@acm.org>
......
......@@ -81,6 +81,7 @@ static inline void io_uring_free(struct task_struct *tsk)
if (tsk->io_uring)
__io_uring_free(tsk);
}
int io_uring_cmd_sock(struct io_uring_cmd *cmd, unsigned int issue_flags);
#else
static inline int io_uring_cmd_import_fixed(u64 ubuf, unsigned long len, int rw,
struct iov_iter *iter, void *ioucmd)
......@@ -116,6 +117,11 @@ static inline const char *io_uring_get_opcode(u8 opcode)
{
return "";
}
static inline int io_uring_cmd_sock(struct io_uring_cmd *cmd,
unsigned int issue_flags)
{
return -EOPNOTSUPP;
}
#endif
#endif
......@@ -69,8 +69,8 @@ struct io_uring_task {
};
struct io_uring {
u32 head ____cacheline_aligned_in_smp;
u32 tail ____cacheline_aligned_in_smp;
u32 head;
u32 tail;
};
/*
......@@ -176,7 +176,6 @@ struct io_submit_state {
unsigned short submit_nr;
unsigned int cqes_count;
struct blk_plug plug;
struct io_uring_cqe cqes[16];
};
struct io_ev_fd {
......@@ -205,25 +204,17 @@ struct io_ring_ctx {
unsigned int has_evfd: 1;
/* all CQEs should be posted only by the submitter task */
unsigned int task_complete: 1;
unsigned int lockless_cq: 1;
unsigned int syscall_iopoll: 1;
unsigned int poll_activated: 1;
unsigned int drain_disabled: 1;
unsigned int compat: 1;
enum task_work_notify_mode notify_method;
/*
* If IORING_SETUP_NO_MMAP is used, then the below holds
* the gup'ed pages for the two rings, and the sqes.
*/
unsigned short n_ring_pages;
unsigned short n_sqe_pages;
struct page **ring_pages;
struct page **sqe_pages;
struct io_rings *rings;
struct task_struct *submitter_task;
struct io_rings *rings;
struct percpu_ref refs;
enum task_work_notify_mode notify_method;
} ____cacheline_aligned_in_smp;
/* submission data */
......@@ -261,31 +252,20 @@ struct io_ring_ctx {
struct io_buffer_list *io_bl;
struct xarray io_bl_xa;
struct list_head io_buffers_cache;
struct io_hash_table cancel_table_locked;
struct list_head cq_overflow_list;
struct io_alloc_cache apoll_cache;
struct io_alloc_cache netmsg_cache;
} ____cacheline_aligned_in_smp;
/* IRQ completion list, under ->completion_lock */
struct io_wq_work_list locked_free_list;
unsigned int locked_free_nr;
const struct cred *sq_creds; /* cred used for __io_sq_thread() */
struct io_sq_data *sq_data; /* if using sq thread polling */
struct wait_queue_head sqo_sq_wait;
struct list_head sqd_list;
unsigned long check_cq;
unsigned int file_alloc_start;
unsigned int file_alloc_end;
struct xarray personalities;
u32 pers_next;
/*
* ->iopoll_list is protected by the ctx->uring_lock for
* io_uring instances that don't use IORING_SETUP_SQPOLL.
* For SQPOLL, only the single threaded io_sq_thread() will
* manipulate the list, hence no extra locking is needed there.
*/
struct io_wq_work_list iopoll_list;
bool poll_multi_queue;
} ____cacheline_aligned_in_smp;
struct {
/*
......@@ -298,39 +278,55 @@ struct io_ring_ctx {
unsigned cached_cq_tail;
unsigned cq_entries;
struct io_ev_fd __rcu *io_ev_fd;
struct wait_queue_head cq_wait;
unsigned cq_extra;
} ____cacheline_aligned_in_smp;
struct {
spinlock_t completion_lock;
bool poll_multi_queue;
atomic_t cq_wait_nr;
/*
* ->iopoll_list is protected by the ctx->uring_lock for
* io_uring instances that don't use IORING_SETUP_SQPOLL.
* For SQPOLL, only the single threaded io_sq_thread() will
* manipulate the list, hence no extra locking is needed there.
* task_work and async notification delivery cacheline. Expected to
* regularly bounce b/w CPUs.
*/
struct io_wq_work_list iopoll_list;
struct io_hash_table cancel_table;
struct {
struct llist_head work_llist;
struct list_head io_buffers_comp;
unsigned long check_cq;
atomic_t cq_wait_nr;
atomic_t cq_timeouts;
struct wait_queue_head cq_wait;
} ____cacheline_aligned_in_smp;
/* timeouts */
struct {
spinlock_t timeout_lock;
atomic_t cq_timeouts;
struct list_head timeout_list;
struct list_head ltimeout_list;
unsigned cq_last_tm_flush;
} ____cacheline_aligned_in_smp;
struct io_uring_cqe completion_cqes[16];
spinlock_t completion_lock;
/* IRQ completion list, under ->completion_lock */
struct io_wq_work_list locked_free_list;
unsigned int locked_free_nr;
struct list_head io_buffers_comp;
struct list_head cq_overflow_list;
struct io_hash_table cancel_table;
const struct cred *sq_creds; /* cred used for __io_sq_thread() */
struct io_sq_data *sq_data; /* if using sq thread polling */
struct wait_queue_head sqo_sq_wait;
struct list_head sqd_list;
unsigned int file_alloc_start;
unsigned int file_alloc_end;
struct xarray personalities;
u32 pers_next;
struct list_head io_buffers_cache;
/* Keep this last, we don't need it for the fast path */
struct wait_queue_head poll_wq;
struct io_restriction restrictions;
......@@ -374,6 +370,15 @@ struct io_ring_ctx {
unsigned sq_thread_idle;
/* protected by ->completion_lock */
unsigned evfd_last_cq_tail;
/*
* If IORING_SETUP_NO_MMAP is used, then the below holds
* the gup'ed pages for the two rings, and the sqes.
*/
unsigned short n_ring_pages;
unsigned short n_sqe_pages;
struct page **ring_pages;
struct page **sqe_pages;
};
struct io_tw_state {
......@@ -409,7 +414,6 @@ enum {
REQ_F_SINGLE_POLL_BIT,
REQ_F_DOUBLE_POLL_BIT,
REQ_F_PARTIAL_IO_BIT,
REQ_F_CQE32_INIT_BIT,
REQ_F_APOLL_MULTISHOT_BIT,
REQ_F_CLEAR_POLLIN_BIT,
REQ_F_HASH_LOCKED_BIT,
......@@ -479,8 +483,6 @@ enum {
REQ_F_PARTIAL_IO = BIT(REQ_F_PARTIAL_IO_BIT),
/* fast poll multishot mode */
REQ_F_APOLL_MULTISHOT = BIT(REQ_F_APOLL_MULTISHOT_BIT),
/* ->extra1 and ->extra2 are initialised */
REQ_F_CQE32_INIT = BIT(REQ_F_CQE32_INIT_BIT),
/* recvmsg special flag, clear EPOLLIN */
REQ_F_CLEAR_POLLIN = BIT(REQ_F_CLEAR_POLLIN_BIT),
/* hashed into ->cancel_hash_locked, protected by ->uring_lock */
......@@ -579,13 +581,7 @@ struct io_kiocb {
struct io_task_work io_task_work;
unsigned nr_tw;
/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
union {
struct hlist_node hash_node;
struct {
u64 extra1;
u64 extra2;
};
};
/* internal polling, see IORING_FEAT_FAST_POLL */
struct async_poll *apoll;
/* opcode allocated if it needs to store data for async defer */
......@@ -595,6 +591,11 @@ struct io_kiocb {
/* custom credentials, valid IFF REQ_F_CREDS is set */
const struct cred *creds;
struct io_wq_work work;
struct {
u64 extra1;
u64 extra2;
} big_cqe;
};
struct io_overflow_cqe {
......
......@@ -185,6 +185,11 @@ enum {
*/
#define IORING_SETUP_REGISTERED_FD_ONLY (1U << 15)
/*
* Removes indirection through the SQ index array.
*/
#define IORING_SETUP_NO_SQARRAY (1U << 16)
enum io_uring_op {
IORING_OP_NOP,
IORING_OP_READV,
......@@ -299,11 +304,15 @@ enum io_uring_op {
* request 'user_data'
* IORING_ASYNC_CANCEL_ANY Match any request
* IORING_ASYNC_CANCEL_FD_FIXED 'fd' passed in is a fixed descriptor
* IORING_ASYNC_CANCEL_USERDATA Match on user_data, default for no other key
* IORING_ASYNC_CANCEL_OP Match request based on opcode
*/
#define IORING_ASYNC_CANCEL_ALL (1U << 0)
#define IORING_ASYNC_CANCEL_FD (1U << 1)
#define IORING_ASYNC_CANCEL_ANY (1U << 2)
#define IORING_ASYNC_CANCEL_FD_FIXED (1U << 3)
#define IORING_ASYNC_CANCEL_USERDATA (1U << 4)
#define IORING_ASYNC_CANCEL_OP (1U << 5)
/*
* send/sendmsg and recv/recvmsg flags (sqe->ioprio)
......@@ -697,7 +706,9 @@ struct io_uring_sync_cancel_reg {
__s32 fd;
__u32 flags;
struct __kernel_timespec timeout;
__u64 pad[4];
__u8 opcode;
__u8 pad[7];
__u64 pad2[3];
};
/*
......@@ -717,6 +728,14 @@ struct io_uring_recvmsg_out {
__u32 flags;
};
/*
* Argument for IORING_OP_URING_CMD when file is a socket
*/
enum {
SOCKET_URING_OP_SIOCINQ = 0,
SOCKET_URING_OP_SIOCOUTQ,
};
#ifdef __cplusplus
}
#endif
......
......@@ -22,35 +22,56 @@ struct io_cancel {
u64 addr;
u32 flags;
s32 fd;
u8 opcode;
};
#define CANCEL_FLAGS (IORING_ASYNC_CANCEL_ALL | IORING_ASYNC_CANCEL_FD | \
IORING_ASYNC_CANCEL_ANY | IORING_ASYNC_CANCEL_FD_FIXED)
IORING_ASYNC_CANCEL_ANY | IORING_ASYNC_CANCEL_FD_FIXED | \
IORING_ASYNC_CANCEL_USERDATA | IORING_ASYNC_CANCEL_OP)
static bool io_cancel_cb(struct io_wq_work *work, void *data)
/*
* Returns true if the request matches the criteria outlined by 'cd'.
*/
bool io_cancel_req_match(struct io_kiocb *req, struct io_cancel_data *cd)
{
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
struct io_cancel_data *cd = data;
bool match_user_data = cd->flags & IORING_ASYNC_CANCEL_USERDATA;
if (req->ctx != cd->ctx)
return false;
if (cd->flags & IORING_ASYNC_CANCEL_ANY) {
;
} else if (cd->flags & IORING_ASYNC_CANCEL_FD) {
if (!(cd->flags & (IORING_ASYNC_CANCEL_FD | IORING_ASYNC_CANCEL_OP)))
match_user_data = true;
if (cd->flags & IORING_ASYNC_CANCEL_ANY)
goto check_seq;
if (cd->flags & IORING_ASYNC_CANCEL_FD) {
if (req->file != cd->file)
return false;
} else {
if (req->cqe.user_data != cd->data)
}
if (cd->flags & IORING_ASYNC_CANCEL_OP) {
if (req->opcode != cd->opcode)
return false;
}
if (cd->flags & (IORING_ASYNC_CANCEL_ALL|IORING_ASYNC_CANCEL_ANY)) {
if (match_user_data && req->cqe.user_data != cd->data)
return false;
if (cd->flags & IORING_ASYNC_CANCEL_ALL) {
check_seq:
if (cd->seq == req->work.cancel_seq)
return false;
req->work.cancel_seq = cd->seq;
}
return true;
}
static bool io_cancel_cb(struct io_wq_work *work, void *data)
{
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
struct io_cancel_data *cd = data;
return io_cancel_req_match(req, cd);
}
static int io_async_cancel_one(struct io_uring_task *tctx,
struct io_cancel_data *cd)
{
......@@ -111,7 +132,7 @@ int io_async_cancel_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (unlikely(req->flags & REQ_F_BUFFER_SELECT))
return -EINVAL;
if (sqe->off || sqe->len || sqe->splice_fd_in)
if (sqe->off || sqe->splice_fd_in)
return -EINVAL;
cancel->addr = READ_ONCE(sqe->addr);
......@@ -123,6 +144,11 @@ int io_async_cancel_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return -EINVAL;
cancel->fd = READ_ONCE(sqe->fd);
}
if (cancel->flags & IORING_ASYNC_CANCEL_OP) {
if (cancel->flags & IORING_ASYNC_CANCEL_ANY)
return -EINVAL;
cancel->opcode = READ_ONCE(sqe->len);
}
return 0;
}
......@@ -169,6 +195,7 @@ int io_async_cancel(struct io_kiocb *req, unsigned int issue_flags)
.ctx = req->ctx,
.data = cancel->addr,
.flags = cancel->flags,
.opcode = cancel->opcode,
.seq = atomic_inc_return(&req->ctx->cancel_seq),
};
struct io_uring_task *tctx = req->task->io_uring;
......@@ -238,17 +265,22 @@ int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg)
struct io_uring_sync_cancel_reg sc;
struct fd f = { };
DEFINE_WAIT(wait);
int ret;
int ret, i;
if (copy_from_user(&sc, arg, sizeof(sc)))
return -EFAULT;
if (sc.flags & ~CANCEL_FLAGS)
return -EINVAL;
if (sc.pad[0] || sc.pad[1] || sc.pad[2] || sc.pad[3])
for (i = 0; i < ARRAY_SIZE(sc.pad); i++)
if (sc.pad[i])
return -EINVAL;
for (i = 0; i < ARRAY_SIZE(sc.pad2); i++)
if (sc.pad2[i])
return -EINVAL;
cd.data = sc.addr;
cd.flags = sc.flags;
cd.opcode = sc.opcode;
/* we can grab a normal file descriptor upfront */
if ((cd.flags & IORING_ASYNC_CANCEL_FD) &&
......
......@@ -8,11 +8,11 @@ struct io_cancel_data {
u64 data;
struct file *file;
};
u8 opcode;
u32 flags;
int seq;
};
int io_async_cancel_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_async_cancel(struct io_kiocb *req, unsigned int issue_flags);
......@@ -21,3 +21,4 @@ int io_try_cancel(struct io_uring_task *tctx, struct io_cancel_data *cd,
void init_hash_table(struct io_hash_table *table, unsigned size);
int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg);
bool io_cancel_req_match(struct io_kiocb *req, struct io_cancel_data *cd);
......@@ -46,9 +46,13 @@ static __cold int io_uring_show_cred(struct seq_file *m, unsigned int id,
return 0;
}
static __cold void __io_uring_show_fdinfo(struct io_ring_ctx *ctx,
struct seq_file *m)
/*
* Caller holds a reference to the file already, we don't need to do
* anything else to get an extra reference.
*/
__cold void io_uring_show_fdinfo(struct seq_file *m, struct file *f)
{
struct io_ring_ctx *ctx = f->private_data;
struct io_sq_data *sq = NULL;
struct io_overflow_cqe *ocqe;
struct io_rings *r = ctx->rings;
......@@ -203,14 +207,4 @@ static __cold void __io_uring_show_fdinfo(struct io_ring_ctx *ctx,
spin_unlock(&ctx->completion_lock);
}
__cold void io_uring_show_fdinfo(struct seq_file *m, struct file *f)
{
struct io_ring_ctx *ctx = f->private_data;
if (percpu_ref_tryget(&ctx->refs)) {
__io_uring_show_fdinfo(ctx, m);
percpu_ref_put(&ctx->refs);
}
}
#endif
......@@ -232,17 +232,25 @@ static void io_worker_exit(struct io_worker *worker)
do_exit(0);
}
static inline bool io_acct_run_queue(struct io_wq_acct *acct)
static inline bool __io_acct_run_queue(struct io_wq_acct *acct)
{
bool ret = false;
return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) &&
!wq_list_empty(&acct->work_list);
}
/*
* If there's work to do, returns true with acct->lock acquired. If not,
* returns false with no lock held.
*/
static inline bool io_acct_run_queue(struct io_wq_acct *acct)
__acquires(&acct->lock)
{
raw_spin_lock(&acct->lock);
if (!wq_list_empty(&acct->work_list) &&
!test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
ret = true;
raw_spin_unlock(&acct->lock);
if (__io_acct_run_queue(acct))
return true;
return ret;
raw_spin_unlock(&acct->lock);
return false;
}
/*
......@@ -268,12 +276,15 @@ static bool io_wq_activate_free_worker(struct io_wq *wq,
io_worker_release(worker);
continue;
}
if (wake_up_process(worker->task)) {
/*
* If the worker is already running, it's either already
* starting work or finishing work. In either case, if it does
* to go sleep, we'll kick off a new task for this work anyway.
*/
wake_up_process(worker->task);
io_worker_release(worker);
return true;
}
io_worker_release(worker);
}
return false;
}
......@@ -397,6 +408,7 @@ static void io_wq_dec_running(struct io_worker *worker)
if (!io_acct_run_queue(acct))
return;
raw_spin_unlock(&acct->lock);
atomic_inc(&acct->nr_running);
atomic_inc(&wq->worker_refs);
io_queue_worker_create(worker, acct, create_worker_cb);
......@@ -521,9 +533,13 @@ static void io_assign_current_work(struct io_worker *worker,
raw_spin_unlock(&worker->lock);
}
static void io_worker_handle_work(struct io_worker *worker)
/*
* Called with acct->lock held, drops it before returning
*/
static void io_worker_handle_work(struct io_wq_acct *acct,
struct io_worker *worker)
__releases(&acct->lock)
{
struct io_wq_acct *acct = io_wq_get_acct(worker);
struct io_wq *wq = worker->wq;
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
......@@ -537,7 +553,6 @@ static void io_worker_handle_work(struct io_worker *worker)
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
raw_spin_lock(&acct->lock);
work = io_get_next_work(acct, worker);
raw_spin_unlock(&acct->lock);
if (work) {
......@@ -591,6 +606,10 @@ static void io_worker_handle_work(struct io_worker *worker)
wake_up(&wq->hash->wait);
}
} while (work);
if (!__io_acct_run_queue(acct))
break;
raw_spin_lock(&acct->lock);
} while (1);
}
......@@ -611,8 +630,13 @@ static int io_wq_worker(void *data)
long ret;
set_current_state(TASK_INTERRUPTIBLE);
/*
* If we have work to do, io_acct_run_queue() returns with
* the acct->lock held. If not, it will drop it.
*/
while (io_acct_run_queue(acct))
io_worker_handle_work(worker);
io_worker_handle_work(acct, worker);
raw_spin_lock(&wq->lock);
/*
......@@ -645,8 +669,8 @@ static int io_wq_worker(void *data)
}
}
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
io_worker_handle_work(worker);
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
io_worker_handle_work(acct, worker);
io_worker_exit(worker);
return 0;
......@@ -909,13 +933,10 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
raw_spin_lock(&wq->lock);
rcu_read_lock();
do_create = !io_wq_activate_free_worker(wq, acct);
rcu_read_unlock();
raw_spin_unlock(&wq->lock);
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running))) {
bool did_create;
......@@ -1285,13 +1306,16 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
return __io_wq_cpu_online(wq, cpu, false);
}
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask)
{
if (!tctx || !tctx->io_wq)
return -EINVAL;
rcu_read_lock();
if (mask)
cpumask_copy(wq->cpu_mask, mask);
cpumask_copy(tctx->io_wq->cpu_mask, mask);
else
cpumask_copy(wq->cpu_mask, cpu_possible_mask);
cpumask_copy(tctx->io_wq->cpu_mask, cpu_possible_mask);
rcu_read_unlock();
return 0;
......
......@@ -50,7 +50,7 @@ void io_wq_put_and_exit(struct io_wq *wq);
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
void io_wq_hash_work(struct io_wq_work *work, void *val);
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask);
int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask);
int io_wq_max_workers(struct io_wq *wq, int *new_count);
static inline bool io_wq_is_hashed(struct io_wq_work *work)
......
This diff is collapsed.
......@@ -38,14 +38,13 @@ enum {
IOU_STOP_MULTISHOT = -ECANCELED,
};
struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow);
bool io_req_cqe_overflow(struct io_kiocb *req);
bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow);
void io_req_cqe_overflow(struct io_kiocb *req);
int io_run_task_work_sig(struct io_ring_ctx *ctx);
void io_req_defer_failed(struct io_kiocb *req, s32 res);
void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags);
bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
bool io_aux_cqe(const struct io_kiocb *req, bool defer, s32 res, u32 cflags,
bool allow_overflow);
bool io_fill_cqe_req_aux(struct io_kiocb *req, bool defer, s32 res, u32 cflags);
void __io_commit_cqring_flush(struct io_ring_ctx *ctx);
struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages);
......@@ -73,7 +72,7 @@ int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file,
int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts);
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node);
void __io_submit_flush_completions(struct io_ring_ctx *ctx);
int io_req_prep_async(struct io_kiocb *req);
struct io_wq_work *io_wq_free_work(struct io_wq_work *work);
......@@ -110,30 +109,30 @@ static inline void io_req_task_work_add(struct io_kiocb *req)
#define io_for_each_link(pos, head) \
for (pos = (head); pos; pos = pos->link)
static inline struct io_uring_cqe *io_get_cqe_overflow(struct io_ring_ctx *ctx,
static inline bool io_get_cqe_overflow(struct io_ring_ctx *ctx,
struct io_uring_cqe **ret,
bool overflow)
{
io_lockdep_assert_cq_locked(ctx);
if (likely(ctx->cqe_cached < ctx->cqe_sentinel)) {
struct io_uring_cqe *cqe = ctx->cqe_cached;
if (unlikely(ctx->cqe_cached >= ctx->cqe_sentinel)) {
if (unlikely(!io_cqe_cache_refill(ctx, overflow)))
return false;
}
*ret = ctx->cqe_cached;
ctx->cached_cq_tail++;
ctx->cqe_cached++;
if (ctx->flags & IORING_SETUP_CQE32)
ctx->cqe_cached++;
return cqe;
}
return __io_get_cqe(ctx, overflow);
return true;
}
static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
static inline bool io_get_cqe(struct io_ring_ctx *ctx, struct io_uring_cqe **ret)
{
return io_get_cqe_overflow(ctx, false);
return io_get_cqe_overflow(ctx, ret, false);
}
static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
static __always_inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
struct io_uring_cqe *cqe;
......@@ -143,37 +142,20 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
* submission (by quite a lot). Increment the overflow count in
* the ring.
*/
cqe = io_get_cqe(ctx);
if (unlikely(!cqe))
if (unlikely(!io_get_cqe(ctx, &cqe)))
return false;
if (trace_io_uring_complete_enabled())
trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
req->cqe.res, req->cqe.flags,
(req->flags & REQ_F_CQE32_INIT) ? req->extra1 : 0,
(req->flags & REQ_F_CQE32_INIT) ? req->extra2 : 0);
req->big_cqe.extra1, req->big_cqe.extra2);
memcpy(cqe, &req->cqe, sizeof(*cqe));
if (ctx->flags & IORING_SETUP_CQE32) {
u64 extra1 = 0, extra2 = 0;
if (req->flags & REQ_F_CQE32_INIT) {
extra1 = req->extra1;
extra2 = req->extra2;
memcpy(cqe->big_cqe, &req->big_cqe, sizeof(*cqe));
memset(&req->big_cqe, 0, sizeof(req->big_cqe));
}
WRITE_ONCE(cqe->big_cqe[0], extra1);
WRITE_ONCE(cqe->big_cqe[1], extra2);
}
return true;
}
static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
if (likely(__io_fill_cqe_req(ctx, req)))
return true;
return io_req_cqe_overflow(req);
}
static inline void req_set_fail(struct io_kiocb *req)
......@@ -196,10 +178,10 @@ static inline bool req_has_async_data(struct io_kiocb *req)
return req->flags & REQ_F_ASYNC_DATA;
}
static inline void io_put_file(struct file *file)
static inline void io_put_file(struct io_kiocb *req)
{
if (file)
fput(file);
if (!(req->flags & REQ_F_FIXED_FILE) && req->file)
fput(req->file);
}
static inline void io_ring_submit_unlock(struct io_ring_ctx *ctx,
......@@ -354,7 +336,6 @@ static inline struct io_kiocb *io_extract_req(struct io_ring_ctx *ctx)
struct io_kiocb *req;
req = container_of(ctx->submit_state.free_list.next, struct io_kiocb, comp_list);
kasan_unpoison_object_data(req_cachep, req);
wq_stack_extract(&ctx->submit_state.free_list);
return req;
}
......
......@@ -641,8 +641,8 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
}
if (!mshot_finished) {
if (io_aux_cqe(req, issue_flags & IO_URING_F_COMPLETE_DEFER,
*ret, cflags | IORING_CQE_F_MORE, true)) {
if (io_fill_cqe_req_aux(req, issue_flags & IO_URING_F_COMPLETE_DEFER,
*ret, cflags | IORING_CQE_F_MORE)) {
io_recv_prep_retry(req);
/* Known not-empty or unknown state, retry */
if (cflags & IORING_CQE_F_SOCK_NONEMPTY ||
......@@ -1366,8 +1366,8 @@ int io_accept(struct io_kiocb *req, unsigned int issue_flags)
if (ret < 0)
return ret;
if (io_aux_cqe(req, issue_flags & IO_URING_F_COMPLETE_DEFER, ret,
IORING_CQE_F_MORE, true))
if (io_fill_cqe_req_aux(req, issue_flags & IO_URING_F_COMPLETE_DEFER,
ret, IORING_CQE_F_MORE))
goto retry;
return -ECANCELED;
......
......@@ -300,8 +300,8 @@ static int io_poll_check_events(struct io_kiocb *req, struct io_tw_state *ts)
__poll_t mask = mangle_poll(req->cqe.res &
req->apoll_events);
if (!io_aux_cqe(req, ts->locked, mask,
IORING_CQE_F_MORE, false)) {
if (!io_fill_cqe_req_aux(req, ts->locked, mask,
IORING_CQE_F_MORE)) {
io_req_set_res(req, mask, 0);
return IOU_POLL_REMOVE_POLL_USE_RES;
}
......@@ -824,15 +824,11 @@ static struct io_kiocb *io_poll_file_find(struct io_ring_ctx *ctx,
spin_lock(&hb->lock);
hlist_for_each_entry(req, &hb->list, hash_node) {
if (!(cd->flags & IORING_ASYNC_CANCEL_ANY) &&
req->file != cd->file)
continue;
if (cd->seq == req->work.cancel_seq)
continue;
req->work.cancel_seq = cd->seq;
if (io_cancel_req_match(req, cd)) {
*out_bucket = hb;
return req;
}
}
spin_unlock(&hb->lock);
}
return NULL;
......@@ -855,7 +851,8 @@ static int __io_poll_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd,
struct io_hash_bucket *bucket;
struct io_kiocb *req;
if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_ANY))
if (cd->flags & (IORING_ASYNC_CANCEL_FD | IORING_ASYNC_CANCEL_OP |
IORING_ASYNC_CANCEL_ANY))
req = io_poll_file_find(ctx, cd, table, &bucket);
else
req = io_poll_find(ctx, false, cd, table, &bucket);
......@@ -972,8 +969,8 @@ int io_poll_add(struct io_kiocb *req, unsigned int issue_flags)
int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_poll_update *poll_update = io_kiocb_to_cmd(req, struct io_poll_update);
struct io_cancel_data cd = { .data = poll_update->old_user_data, };
struct io_ring_ctx *ctx = req->ctx;
struct io_cancel_data cd = { .ctx = ctx, .data = poll_update->old_user_data, };
struct io_hash_bucket *bucket;
struct io_kiocb *preq;
int ret2, ret = 0;
......
......@@ -33,6 +33,12 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
#define IORING_MAX_FIXED_FILES (1U << 20)
#define IORING_MAX_REG_BUFFERS (1U << 14)
static const struct io_mapped_ubuf dummy_ubuf = {
/* set invalid range, so io_import_fixed() fails meeting it */
.ubuf = -1UL,
.ubuf_end = 0,
};
int __io_account_mem(struct user_struct *user, unsigned long nr_pages)
{
unsigned long page_limit, cur_pages, new_pages;
......@@ -132,7 +138,7 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_mapped_ubuf **slo
struct io_mapped_ubuf *imu = *slot;
unsigned int i;
if (imu != ctx->dummy_ubuf) {
if (imu != &dummy_ubuf) {
for (i = 0; i < imu->nr_bvecs; i++)
unpin_user_page(imu->bvec[i].bv_page);
if (imu->acct_pages)
......@@ -459,14 +465,14 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
break;
i = array_index_nospec(up->offset + done, ctx->nr_user_bufs);
if (ctx->user_bufs[i] != ctx->dummy_ubuf) {
if (ctx->user_bufs[i] != &dummy_ubuf) {
err = io_queue_rsrc_removal(ctx->buf_data, i,
ctx->user_bufs[i]);
if (unlikely(err)) {
io_buffer_unmap(ctx, &imu);
break;
}
ctx->user_bufs[i] = ctx->dummy_ubuf;
ctx->user_bufs[i] = (struct io_mapped_ubuf *)&dummy_ubuf;
}
ctx->user_bufs[i] = imu;
......@@ -1077,7 +1083,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
int ret, nr_pages, i;
struct folio *folio = NULL;
*pimu = ctx->dummy_ubuf;
*pimu = (struct io_mapped_ubuf *)&dummy_ubuf;
if (!iov->iov_base)
return 0;
......
......@@ -54,10 +54,9 @@ struct io_mapped_ubuf {
u64 ubuf_end;
unsigned int nr_bvecs;
unsigned long acct_pages;
struct bio_vec bvec[];
struct bio_vec bvec[] __counted_by(nr_bvecs);
};
void io_rsrc_put_tw(struct callback_head *cb);
void io_rsrc_node_ref_zero(struct io_rsrc_node *node);
void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *ref_node);
struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx);
......
......@@ -989,13 +989,6 @@ int io_write(struct io_kiocb *req, unsigned int issue_flags)
return ret;
}
static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx)
{
io_commit_cqring_flush(ctx);
if (ctx->flags & IORING_SETUP_SQPOLL)
io_cqring_wake(ctx);
}
void io_rw_fail(struct io_kiocb *req)
{
int res;
......@@ -1066,24 +1059,17 @@ int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
if (!smp_load_acquire(&req->iopoll_completed))
break;
nr_events++;
if (unlikely(req->flags & REQ_F_CQE_SKIP))
continue;
req->cqe.flags = io_put_kbuf(req, 0);
if (unlikely(!__io_fill_cqe_req(ctx, req))) {
spin_lock(&ctx->completion_lock);
io_req_cqe_overflow(req);
spin_unlock(&ctx->completion_lock);
}
}
if (unlikely(!nr_events))
return 0;
io_commit_cqring(ctx);
io_cqring_ev_posted_iopoll(ctx);
pos = start ? start->next : ctx->iopoll_list.first;
wq_list_cut(&ctx->iopoll_list, prev, start);
io_free_batch_list(ctx, pos);
if (WARN_ON_ONCE(!wq_list_empty(&ctx->submit_state.compl_reqs)))
return 0;
ctx->submit_state.compl_reqs.first = pos;
__io_submit_flush_completions(ctx);
return nr_events;
}
......@@ -68,7 +68,7 @@ int io_tee(struct io_kiocb *req, unsigned int issue_flags)
ret = do_tee(in, out, sp->len, flags);
if (!(sp->flags & SPLICE_F_FD_IN_FIXED))
io_put_file(in);
fput(in);
done:
if (ret != sp->len)
req_set_fail(req);
......@@ -112,7 +112,7 @@ int io_splice(struct io_kiocb *req, unsigned int issue_flags)
ret = do_splice(in, poff_in, out, poff_out, sp->len, flags);
if (!(sp->flags & SPLICE_F_FD_IN_FIXED))
io_put_file(in);
fput(in);
done:
if (ret != sp->len)
req_set_fail(req);
......
......@@ -421,3 +421,18 @@ __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
io_sq_thread_finish(ctx);
return ret;
}
__cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
cpumask_var_t mask)
{
struct io_sq_data *sqd = ctx->sq_data;
int ret = -EINVAL;
if (sqd) {
io_sq_thread_park(sqd);
ret = io_wq_cpu_affinity(sqd->thread->io_uring, mask);
io_sq_thread_unpark(sqd);
}
return ret;
}
......@@ -27,3 +27,4 @@ void io_sq_thread_park(struct io_sq_data *sqd);
void io_sq_thread_unpark(struct io_sq_data *sqd);
void io_put_sq_data(struct io_sq_data *sqd);
void io_sqpoll_wait_sq(struct io_ring_ctx *ctx);
int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx, cpumask_var_t mask);
......@@ -73,8 +73,8 @@ static void io_timeout_complete(struct io_kiocb *req, struct io_tw_state *ts)
if (!io_timeout_finish(timeout, data)) {
bool filled;
filled = io_aux_cqe(req, ts->locked, -ETIME, IORING_CQE_F_MORE,
false);
filled = io_fill_cqe_req_aux(req, ts->locked, -ETIME,
IORING_CQE_F_MORE);
if (filled) {
/* re-arm timer */
spin_lock_irq(&ctx->timeout_lock);
......@@ -268,17 +268,11 @@ static struct io_kiocb *io_timeout_extract(struct io_ring_ctx *ctx,
list_for_each_entry(timeout, &ctx->timeout_list, list) {
struct io_kiocb *tmp = cmd_to_io_kiocb(timeout);
if (!(cd->flags & IORING_ASYNC_CANCEL_ANY) &&
cd->data != tmp->cqe.user_data)
continue;
if (cd->flags & (IORING_ASYNC_CANCEL_ALL|IORING_ASYNC_CANCEL_ANY)) {
if (cd->seq == tmp->work.cancel_seq)
continue;
tmp->work.cancel_seq = cd->seq;
}
if (io_cancel_req_match(tmp, cd)) {
req = tmp;
break;
}
}
if (!req)
return ERR_PTR(-ENOENT);
......@@ -409,7 +403,7 @@ static int io_timeout_update(struct io_ring_ctx *ctx, __u64 user_data,
struct timespec64 *ts, enum hrtimer_mode mode)
__must_hold(&ctx->timeout_lock)
{
struct io_cancel_data cd = { .data = user_data, };
struct io_cancel_data cd = { .ctx = ctx, .data = user_data, };
struct io_kiocb *req = io_timeout_extract(ctx, &cd);
struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
struct io_timeout_data *data;
......@@ -473,7 +467,7 @@ int io_timeout_remove(struct io_kiocb *req, unsigned int issue_flags)
int ret;
if (!(tr->flags & IORING_TIMEOUT_UPDATE)) {
struct io_cancel_data cd = { .data = tr->addr, };
struct io_cancel_data cd = { .ctx = ctx, .data = tr->addr, };
spin_lock(&ctx->completion_lock);
ret = io_timeout_cancel(ctx, &cd);
......
......@@ -7,6 +7,7 @@
#include <linux/nospec.h>
#include <uapi/linux/io_uring.h>
#include <uapi/asm-generic/ioctls.h>
#include "io_uring.h"
#include "rsrc.h"
......@@ -42,9 +43,8 @@ EXPORT_SYMBOL_GPL(io_uring_cmd_do_in_task_lazy);
static inline void io_req_set_cqe32_extra(struct io_kiocb *req,
u64 extra1, u64 extra2)
{
req->extra1 = extra1;
req->extra2 = extra2;
req->flags |= REQ_F_CQE32_INIT;
req->big_cqe.extra1 = extra1;
req->big_cqe.extra2 = extra2;
}
/*
......@@ -164,3 +164,30 @@ int io_uring_cmd_import_fixed(u64 ubuf, unsigned long len, int rw,
return io_import_fixed(rw, iter, req->imu, ubuf, len);
}
EXPORT_SYMBOL_GPL(io_uring_cmd_import_fixed);
int io_uring_cmd_sock(struct io_uring_cmd *cmd, unsigned int issue_flags)
{
struct socket *sock = cmd->file->private_data;
struct sock *sk = sock->sk;
struct proto *prot = READ_ONCE(sk->sk_prot);
int ret, arg = 0;
if (!prot || !prot->ioctl)
return -EOPNOTSUPP;
switch (cmd->sqe->cmd_op) {
case SOCKET_URING_OP_SIOCINQ:
ret = prot->ioctl(sk, SIOCINQ, &arg);
if (ret)
return ret;
return arg;
case SOCKET_URING_OP_SIOCOUTQ:
ret = prot->ioctl(sk, SIOCOUTQ, &arg);
if (ret)
return ret;
return arg;
default:
return -EOPNOTSUPP;
}
}
EXPORT_SYMBOL_GPL(io_uring_cmd_sock);
......@@ -88,6 +88,7 @@
#include <linux/xattr.h>
#include <linux/nospec.h>
#include <linux/indirect_call_wrapper.h>
#include <linux/io_uring.h>
#include <linux/uaccess.h>
#include <asm/unistd.h>
......@@ -160,6 +161,7 @@ static const struct file_operations socket_file_ops = {
#ifdef CONFIG_COMPAT
.compat_ioctl = compat_sock_ioctl,
#endif
.uring_cmd = io_uring_cmd_sock,
.mmap = sock_mmap,
.release = sock_close,
.fasync = sock_fasync,
......
# SPDX-License-Identifier: GPL-2.0
# Makefile for io_uring test tools
CFLAGS += -Wall -Wextra -g -D_GNU_SOURCE
LDLIBS += -lpthread
all: io_uring-cp io_uring-bench
%: %.c
$(CC) $(CFLAGS) -o $@ $^
io_uring-bench: syscall.o io_uring-bench.o
$(CC) $(CFLAGS) -o $@ $^ $(LDLIBS)
io_uring-cp: setup.o syscall.o queue.o
clean:
$(RM) io_uring-cp io_uring-bench *.o
.PHONY: all clean
This directory includes a few programs that demonstrate how to use io_uring
in an application. The examples are:
io_uring-cp
A very basic io_uring implementation of cp(1). It takes two
arguments, copies the first argument to the second. This example
is part of liburing, and hence uses the simplified liburing API
for setting up an io_uring instance, submitting IO, completing IO,
etc. The support functions in queue.c and setup.c are straight
out of liburing.
io_uring-bench
Benchmark program that does random reads on a number of files. This
app demonstrates the various features of io_uring, like fixed files,
fixed buffers, and polled IO. There are options in the program to
control which features to use. Arguments is the file (or files) that
io_uring-bench should operate on. This uses the raw io_uring
interface.
liburing can be cloned with git here:
git://git.kernel.dk/liburing
and contains a number of unit tests as well for testing io_uring. It also
comes with man pages for the three system calls.
Fio includes an io_uring engine, you can clone fio here:
git://git.kernel.dk/fio
#ifndef LIBURING_BARRIER_H
#define LIBURING_BARRIER_H
#if defined(__x86_64) || defined(__i386__)
#define read_barrier() __asm__ __volatile__("":::"memory")
#define write_barrier() __asm__ __volatile__("":::"memory")
#else
/*
* Add arch appropriate definitions. Be safe and use full barriers for
* archs we don't have support for.
*/
#define read_barrier() __sync_synchronize()
#define write_barrier() __sync_synchronize()
#endif
#endif
This diff is collapsed.
// SPDX-License-Identifier: GPL-2.0
/*
* Simple test program that demonstrates a file copy through io_uring. This
* uses the API exposed by liburing.
*
* Copyright (C) 2018-2019 Jens Axboe
*/
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <inttypes.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include "liburing.h"
#define QD 64
#define BS (32*1024)
static int infd, outfd;
struct io_data {
int read;
off_t first_offset, offset;
size_t first_len;
struct iovec iov;
};
static int setup_context(unsigned entries, struct io_uring *ring)
{
int ret;
ret = io_uring_queue_init(entries, ring, 0);
if (ret < 0) {
fprintf(stderr, "queue_init: %s\n", strerror(-ret));
return -1;
}
return 0;
}
static int get_file_size(int fd, off_t *size)
{
struct stat st;
if (fstat(fd, &st) < 0)
return -1;
if (S_ISREG(st.st_mode)) {
*size = st.st_size;
return 0;
} else if (S_ISBLK(st.st_mode)) {
unsigned long long bytes;
if (ioctl(fd, BLKGETSIZE64, &bytes) != 0)
return -1;
*size = bytes;
return 0;
}
return -1;
}
static void queue_prepped(struct io_uring *ring, struct io_data *data)
{
struct io_uring_sqe *sqe;
sqe = io_uring_get_sqe(ring);
assert(sqe);
if (data->read)
io_uring_prep_readv(sqe, infd, &data->iov, 1, data->offset);
else
io_uring_prep_writev(sqe, outfd, &data->iov, 1, data->offset);
io_uring_sqe_set_data(sqe, data);
}
static int queue_read(struct io_uring *ring, off_t size, off_t offset)
{
struct io_uring_sqe *sqe;
struct io_data *data;
data = malloc(size + sizeof(*data));
if (!data)
return 1;
sqe = io_uring_get_sqe(ring);
if (!sqe) {
free(data);
return 1;
}
data->read = 1;
data->offset = data->first_offset = offset;
data->iov.iov_base = data + 1;
data->iov.iov_len = size;
data->first_len = size;
io_uring_prep_readv(sqe, infd, &data->iov, 1, offset);
io_uring_sqe_set_data(sqe, data);
return 0;
}
static void queue_write(struct io_uring *ring, struct io_data *data)
{
data->read = 0;
data->offset = data->first_offset;
data->iov.iov_base = data + 1;
data->iov.iov_len = data->first_len;
queue_prepped(ring, data);
io_uring_submit(ring);
}
static int copy_file(struct io_uring *ring, off_t insize)
{
unsigned long reads, writes;
struct io_uring_cqe *cqe;
off_t write_left, offset;
int ret;
write_left = insize;
writes = reads = offset = 0;
while (insize || write_left) {
int had_reads, got_comp;
/*
* Queue up as many reads as we can
*/
had_reads = reads;
while (insize) {
off_t this_size = insize;
if (reads + writes >= QD)
break;
if (this_size > BS)
this_size = BS;
else if (!this_size)
break;
if (queue_read(ring, this_size, offset))
break;
insize -= this_size;
offset += this_size;
reads++;
}
if (had_reads != reads) {
ret = io_uring_submit(ring);
if (ret < 0) {
fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret));
break;
}
}
/*
* Queue is full at this point. Find at least one completion.
*/
got_comp = 0;
while (write_left) {
struct io_data *data;
if (!got_comp) {
ret = io_uring_wait_cqe(ring, &cqe);
got_comp = 1;
} else {
ret = io_uring_peek_cqe(ring, &cqe);
if (ret == -EAGAIN) {
cqe = NULL;
ret = 0;
}
}
if (ret < 0) {
fprintf(stderr, "io_uring_peek_cqe: %s\n",
strerror(-ret));
return 1;
}
if (!cqe)
break;
data = io_uring_cqe_get_data(cqe);
if (cqe->res < 0) {
if (cqe->res == -EAGAIN) {
queue_prepped(ring, data);
io_uring_cqe_seen(ring, cqe);
continue;
}
fprintf(stderr, "cqe failed: %s\n",
strerror(-cqe->res));
return 1;
} else if (cqe->res != data->iov.iov_len) {
/* Short read/write, adjust and requeue */
data->iov.iov_base += cqe->res;
data->iov.iov_len -= cqe->res;
data->offset += cqe->res;
queue_prepped(ring, data);
io_uring_cqe_seen(ring, cqe);
continue;
}
/*
* All done. if write, nothing else to do. if read,
* queue up corresponding write.
*/
if (data->read) {
queue_write(ring, data);
write_left -= data->first_len;
reads--;
writes++;
} else {
free(data);
writes--;
}
io_uring_cqe_seen(ring, cqe);
}
}
/* wait out pending writes */
while (writes) {
struct io_data *data;
ret = io_uring_wait_cqe(ring, &cqe);
if (ret) {
fprintf(stderr, "wait_cqe=%d\n", ret);
return 1;
}
if (cqe->res < 0) {
fprintf(stderr, "write res=%d\n", cqe->res);
return 1;
}
data = io_uring_cqe_get_data(cqe);
free(data);
writes--;
io_uring_cqe_seen(ring, cqe);
}
return 0;
}
int main(int argc, char *argv[])
{
struct io_uring ring;
off_t insize;
int ret;
if (argc < 3) {
printf("%s: infile outfile\n", argv[0]);
return 1;
}
infd = open(argv[1], O_RDONLY);
if (infd < 0) {
perror("open infile");
return 1;
}
outfd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (outfd < 0) {
perror("open outfile");
return 1;
}
if (setup_context(QD, &ring))
return 1;
if (get_file_size(infd, &insize))
return 1;
ret = copy_file(&ring, insize);
close(infd);
close(outfd);
io_uring_queue_exit(&ring);
return ret;
}
#ifndef LIB_URING_H
#define LIB_URING_H
#ifdef __cplusplus
extern "C" {
#endif
#include <sys/uio.h>
#include <signal.h>
#include <string.h>
#include "../../include/uapi/linux/io_uring.h"
#include <inttypes.h>
#include <linux/swab.h>
#include "barrier.h"
/*
* Library interface to io_uring
*/
struct io_uring_sq {
unsigned *khead;
unsigned *ktail;
unsigned *kring_mask;
unsigned *kring_entries;
unsigned *kflags;
unsigned *kdropped;
unsigned *array;
struct io_uring_sqe *sqes;
unsigned sqe_head;
unsigned sqe_tail;
size_t ring_sz;
};
struct io_uring_cq {
unsigned *khead;
unsigned *ktail;
unsigned *kring_mask;
unsigned *kring_entries;
unsigned *koverflow;
struct io_uring_cqe *cqes;
size_t ring_sz;
};
struct io_uring {
struct io_uring_sq sq;
struct io_uring_cq cq;
int ring_fd;
};
/*
* System calls
*/
extern int io_uring_setup(unsigned entries, struct io_uring_params *p);
extern int io_uring_enter(int fd, unsigned to_submit,
unsigned min_complete, unsigned flags, sigset_t *sig);
extern int io_uring_register(int fd, unsigned int opcode, void *arg,
unsigned int nr_args);
/*
* Library interface
*/
extern int io_uring_queue_init(unsigned entries, struct io_uring *ring,
unsigned flags);
extern int io_uring_queue_mmap(int fd, struct io_uring_params *p,
struct io_uring *ring);
extern void io_uring_queue_exit(struct io_uring *ring);
extern int io_uring_peek_cqe(struct io_uring *ring,
struct io_uring_cqe **cqe_ptr);
extern int io_uring_wait_cqe(struct io_uring *ring,
struct io_uring_cqe **cqe_ptr);
extern int io_uring_submit(struct io_uring *ring);
extern struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);
/*
* Must be called after io_uring_{peek,wait}_cqe() after the cqe has
* been processed by the application.
*/
static inline void io_uring_cqe_seen(struct io_uring *ring,
struct io_uring_cqe *cqe)
{
if (cqe) {
struct io_uring_cq *cq = &ring->cq;
(*cq->khead)++;
/*
* Ensure that the kernel sees our new head, the kernel has
* the matching read barrier.
*/
write_barrier();
}
}
/*
* Command prep helpers
*/
static inline void io_uring_sqe_set_data(struct io_uring_sqe *sqe, void *data)
{
sqe->user_data = (unsigned long) data;
}
static inline void *io_uring_cqe_get_data(struct io_uring_cqe *cqe)
{
return (void *) (uintptr_t) cqe->user_data;
}
static inline void io_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd,
const void *addr, unsigned len,
off_t offset)
{
memset(sqe, 0, sizeof(*sqe));
sqe->opcode = op;
sqe->fd = fd;
sqe->off = offset;
sqe->addr = (unsigned long) addr;
sqe->len = len;
}
static inline void io_uring_prep_readv(struct io_uring_sqe *sqe, int fd,
const struct iovec *iovecs,
unsigned nr_vecs, off_t offset)
{
io_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset);
}
static inline void io_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd,
void *buf, unsigned nbytes,
off_t offset)
{
io_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
}
static inline void io_uring_prep_writev(struct io_uring_sqe *sqe, int fd,
const struct iovec *iovecs,
unsigned nr_vecs, off_t offset)
{
io_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset);
}
static inline void io_uring_prep_write_fixed(struct io_uring_sqe *sqe, int fd,
const void *buf, unsigned nbytes,
off_t offset)
{
io_uring_prep_rw(IORING_OP_WRITE_FIXED, sqe, fd, buf, nbytes, offset);
}
static inline void io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd,
unsigned poll_mask)
{
memset(sqe, 0, sizeof(*sqe));
sqe->opcode = IORING_OP_POLL_ADD;
sqe->fd = fd;
#if __BYTE_ORDER == __BIG_ENDIAN
poll_mask = __swahw32(poll_mask);
#endif
sqe->poll_events = poll_mask;
}
static inline void io_uring_prep_poll_remove(struct io_uring_sqe *sqe,
void *user_data)
{
memset(sqe, 0, sizeof(*sqe));
sqe->opcode = IORING_OP_POLL_REMOVE;
sqe->addr = (unsigned long) user_data;
}
static inline void io_uring_prep_fsync(struct io_uring_sqe *sqe, int fd,
unsigned fsync_flags)
{
memset(sqe, 0, sizeof(*sqe));
sqe->opcode = IORING_OP_FSYNC;
sqe->fd = fd;
sqe->fsync_flags = fsync_flags;
}
static inline void io_uring_prep_nop(struct io_uring_sqe *sqe)
{
memset(sqe, 0, sizeof(*sqe));
sqe->opcode = IORING_OP_NOP;
}
#ifdef __cplusplus
}
#endif
#endif
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "liburing.h"
#include "barrier.h"
static int __io_uring_get_cqe(struct io_uring *ring,
struct io_uring_cqe **cqe_ptr, int wait)
{
struct io_uring_cq *cq = &ring->cq;
const unsigned mask = *cq->kring_mask;
unsigned head;
int ret;
*cqe_ptr = NULL;
head = *cq->khead;
do {
/*
* It's necessary to use a read_barrier() before reading
* the CQ tail, since the kernel updates it locklessly. The
* kernel has the matching store barrier for the update. The
* kernel also ensures that previous stores to CQEs are ordered
* with the tail update.
*/
read_barrier();
if (head != *cq->ktail) {
*cqe_ptr = &cq->cqes[head & mask];
break;
}
if (!wait)
break;
ret = io_uring_enter(ring->ring_fd, 0, 1,
IORING_ENTER_GETEVENTS, NULL);
if (ret < 0)
return -errno;
} while (1);
return 0;
}
/*
* Return an IO completion, if one is readily available. Returns 0 with
* cqe_ptr filled in on success, -errno on failure.
*/
int io_uring_peek_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr)
{
return __io_uring_get_cqe(ring, cqe_ptr, 0);
}
/*
* Return an IO completion, waiting for it if necessary. Returns 0 with
* cqe_ptr filled in on success, -errno on failure.
*/
int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr)
{
return __io_uring_get_cqe(ring, cqe_ptr, 1);
}
/*
* Submit sqes acquired from io_uring_get_sqe() to the kernel.
*
* Returns number of sqes submitted
*/
int io_uring_submit(struct io_uring *ring)
{
struct io_uring_sq *sq = &ring->sq;
const unsigned mask = *sq->kring_mask;
unsigned ktail, ktail_next, submitted, to_submit;
int ret;
/*
* If we have pending IO in the kring, submit it first. We need a
* read barrier here to match the kernels store barrier when updating
* the SQ head.
*/
read_barrier();
if (*sq->khead != *sq->ktail) {
submitted = *sq->kring_entries;
goto submit;
}
if (sq->sqe_head == sq->sqe_tail)
return 0;
/*
* Fill in sqes that we have queued up, adding them to the kernel ring
*/
submitted = 0;
ktail = ktail_next = *sq->ktail;
to_submit = sq->sqe_tail - sq->sqe_head;
while (to_submit--) {
ktail_next++;
read_barrier();
sq->array[ktail & mask] = sq->sqe_head & mask;
ktail = ktail_next;
sq->sqe_head++;
submitted++;
}
if (!submitted)
return 0;
if (*sq->ktail != ktail) {
/*
* First write barrier ensures that the SQE stores are updated
* with the tail update. This is needed so that the kernel
* will never see a tail update without the preceeding sQE
* stores being done.
*/
write_barrier();
*sq->ktail = ktail;
/*
* The kernel has the matching read barrier for reading the
* SQ tail.
*/
write_barrier();
}
submit:
ret = io_uring_enter(ring->ring_fd, submitted, 0,
IORING_ENTER_GETEVENTS, NULL);
if (ret < 0)
return -errno;
return ret;
}
/*
* Return an sqe to fill. Application must later call io_uring_submit()
* when it's ready to tell the kernel about it. The caller may call this
* function multiple times before calling io_uring_submit().
*
* Returns a vacant sqe, or NULL if we're full.
*/
struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
{
struct io_uring_sq *sq = &ring->sq;
unsigned next = sq->sqe_tail + 1;
struct io_uring_sqe *sqe;
/*
* All sqes are used
*/
if (next - sq->sqe_head > *sq->kring_entries)
return NULL;
sqe = &sq->sqes[sq->sqe_tail & *sq->kring_mask];
sq->sqe_tail = next;
return sqe;
}
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "liburing.h"
static int io_uring_mmap(int fd, struct io_uring_params *p,
struct io_uring_sq *sq, struct io_uring_cq *cq)
{
size_t size;
void *ptr;
int ret;
sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
if (ptr == MAP_FAILED)
return -errno;
sq->khead = ptr + p->sq_off.head;
sq->ktail = ptr + p->sq_off.tail;
sq->kring_mask = ptr + p->sq_off.ring_mask;
sq->kring_entries = ptr + p->sq_off.ring_entries;
sq->kflags = ptr + p->sq_off.flags;
sq->kdropped = ptr + p->sq_off.dropped;
sq->array = ptr + p->sq_off.array;
size = p->sq_entries * sizeof(struct io_uring_sqe);
sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd,
IORING_OFF_SQES);
if (sq->sqes == MAP_FAILED) {
ret = -errno;
err:
munmap(sq->khead, sq->ring_sz);
return ret;
}
cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe);
ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
if (ptr == MAP_FAILED) {
ret = -errno;
munmap(sq->sqes, p->sq_entries * sizeof(struct io_uring_sqe));
goto err;
}
cq->khead = ptr + p->cq_off.head;
cq->ktail = ptr + p->cq_off.tail;
cq->kring_mask = ptr + p->cq_off.ring_mask;
cq->kring_entries = ptr + p->cq_off.ring_entries;
cq->koverflow = ptr + p->cq_off.overflow;
cq->cqes = ptr + p->cq_off.cqes;
return 0;
}
/*
* For users that want to specify sq_thread_cpu or sq_thread_idle, this
* interface is a convenient helper for mmap()ing the rings.
* Returns -1 on error, or zero on success. On success, 'ring'
* contains the necessary information to read/write to the rings.
*/
int io_uring_queue_mmap(int fd, struct io_uring_params *p, struct io_uring *ring)
{
int ret;
memset(ring, 0, sizeof(*ring));
ret = io_uring_mmap(fd, p, &ring->sq, &ring->cq);
if (!ret)
ring->ring_fd = fd;
return ret;
}
/*
* Returns -1 on error, or zero on success. On success, 'ring'
* contains the necessary information to read/write to the rings.
*/
int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags)
{
struct io_uring_params p;
int fd, ret;
memset(&p, 0, sizeof(p));
p.flags = flags;
fd = io_uring_setup(entries, &p);
if (fd < 0)
return fd;
ret = io_uring_queue_mmap(fd, &p, ring);
if (ret)
close(fd);
return ret;
}
void io_uring_queue_exit(struct io_uring *ring)
{
struct io_uring_sq *sq = &ring->sq;
struct io_uring_cq *cq = &ring->cq;
munmap(sq->sqes, *sq->kring_entries * sizeof(struct io_uring_sqe));
munmap(sq->khead, sq->ring_sz);
munmap(cq->khead, cq->ring_sz);
close(ring->ring_fd);
}
/*
* Will go away once libc support is there
*/
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/uio.h>
#include <signal.h>
#include "liburing.h"
#ifdef __alpha__
/*
* alpha is the only exception, all other architectures
* have common numbers for new system calls.
*/
# ifndef __NR_io_uring_setup
# define __NR_io_uring_setup 535
# endif
# ifndef __NR_io_uring_enter
# define __NR_io_uring_enter 536
# endif
# ifndef __NR_io_uring_register
# define __NR_io_uring_register 537
# endif
#else /* !__alpha__ */
# ifndef __NR_io_uring_setup
# define __NR_io_uring_setup 425
# endif
# ifndef __NR_io_uring_enter
# define __NR_io_uring_enter 426
# endif
# ifndef __NR_io_uring_register
# define __NR_io_uring_register 427
# endif
#endif
int io_uring_register(int fd, unsigned int opcode, void *arg,
unsigned int nr_args)
{
return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args);
}
int io_uring_setup(unsigned int entries, struct io_uring_params *p)
{
return syscall(__NR_io_uring_setup, entries, p);
}
int io_uring_enter(int fd, unsigned int to_submit, unsigned int min_complete,
unsigned int flags, sigset_t *sig)
{
return syscall(__NR_io_uring_enter, fd, to_submit, min_complete,
flags, sig, _NSIG / 8);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment