Commit 5ce3307b authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'for-linus-20190502' of git://git.kernel.dk/linux-block

Pull io_uring fixes from Jens Axboe:
 "This is mostly io_uring fixes/tweaks. Most of these were actually done
  in time for the last -rc, but I wanted to ensure that everything
  tested out great before including them. The code delta looks larger
  than it really is, as it's mostly just comment additions/changes.

  Outside of the comment additions/changes, this is mostly removal of
  unnecessary barriers. In all, this pull request contains:

   - Tweak to how we handle errors at submission time. We now post a
     completion event if the error occurs on behalf of an sqe, instead
     of returning it through the system call. If the error happens
     outside of a specific sqe, we return the error through the system
     call. This makes it nicer to use and makes the "normal" use case
     behave the same as the offload cases. (me)

   - Fix for a missing req reference drop from async context (me)

   - If an sqe is submitted with RWF_NOWAIT, don't punt it to async
     context. Return -EAGAIN directly, instead of using it as a hint to
     do async punt. (Stefan)

   - Fix notes on barriers (Stefan)

   - Remove unnecessary barriers (Stefan)

   - Fix potential double free of memory in setup error (Mark)

   - Further improve sq poll CPU validation (Mark)

   - Fix page allocation warning and leak on buffer registration error
     (Mark)

   - Fix iov_iter_type() for new no-ref flag (Ming)

   - Fix a case where dio doesn't honor bio no-page-ref (Ming)"

* tag 'for-linus-20190502' of git://git.kernel.dk/linux-block:
  io_uring: avoid page allocation warnings
  iov_iter: fix iov_iter_type
  block: fix handling for BIO_NO_PAGE_REF
  io_uring: drop req submit reference always in async punt
  io_uring: free allocated io_memory once
  io_uring: fix SQPOLL cpu validation
  io_uring: have submission side sqe errors post a cqe
  io_uring: remove unnecessary barrier after unsetting IORING_SQ_NEED_WAKEUP
  io_uring: remove unnecessary barrier after incrementing dropped counter
  io_uring: remove unnecessary barrier before reading SQ tail
  io_uring: remove unnecessary barrier after updating SQ head
  io_uring: remove unnecessary barrier before reading cq head
  io_uring: remove unnecessary barrier before wq_has_sleeper
  io_uring: fix notes on barriers
  io_uring: fix handling SQEs requesting NOWAIT
parents b7a5b22b d4ef6475
......@@ -264,7 +264,8 @@ __blkdev_direct_IO_simple(struct kiocb *iocb, struct iov_iter *iter,
bio_for_each_segment_all(bvec, &bio, i, iter_all) {
if (should_dirty && !PageCompound(bvec->bv_page))
set_page_dirty_lock(bvec->bv_page);
put_page(bvec->bv_page);
if (!bio_flagged(&bio, BIO_NO_PAGE_REF))
put_page(bvec->bv_page);
}
if (unlikely(bio.bi_status))
......
......@@ -4,15 +4,28 @@
* supporting fast/efficient IO.
*
* A note on the read/write ordering memory barriers that are matched between
* the application and kernel side. When the application reads the CQ ring
* tail, it must use an appropriate smp_rmb() to order with the smp_wmb()
* the kernel uses after writing the tail. Failure to do so could cause a
* delay in when the application notices that completion events available.
* This isn't a fatal condition. Likewise, the application must use an
* appropriate smp_wmb() both before writing the SQ tail, and after writing
* the SQ tail. The first one orders the sqe writes with the tail write, and
* the latter is paired with the smp_rmb() the kernel will issue before
* reading the SQ tail on submission.
* the application and kernel side.
*
* After the application reads the CQ ring tail, it must use an
* appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
* before writing the tail (using smp_load_acquire to read the tail will
* do). It also needs a smp_mb() before updating CQ head (ordering the
* entry load(s) with the head store), pairing with an implicit barrier
* through a control-dependency in io_get_cqring (smp_store_release to
* store head will do). Failure to do so could lead to reading invalid
* CQ entries.
*
* Likewise, the application must use an appropriate smp_wmb() before
* writing the SQ tail (ordering SQ entry stores with the tail store),
* which pairs with smp_load_acquire in io_get_sqring (smp_store_release
* to store the tail will do). And it needs a barrier ordering the SQ
* head load before writing new SQ entries (smp_load_acquire to read
* head will do).
*
* When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
* needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
* updating the SQ tail; a full memory barrier smp_mb() is needed
* between.
*
* Also see the examples in the liburing library:
*
......@@ -70,20 +83,108 @@ struct io_uring {
u32 tail ____cacheline_aligned_in_smp;
};
/*
* This data is shared with the application through the mmap at offset
* IORING_OFF_SQ_RING.
*
* The offsets to the member fields are published through struct
* io_sqring_offsets when calling io_uring_setup.
*/
struct io_sq_ring {
/*
* Head and tail offsets into the ring; the offsets need to be
* masked to get valid indices.
*
* The kernel controls head and the application controls tail.
*/
struct io_uring r;
/*
* Bitmask to apply to head and tail offsets (constant, equals
* ring_entries - 1)
*/
u32 ring_mask;
/* Ring size (constant, power of 2) */
u32 ring_entries;
/*
* Number of invalid entries dropped by the kernel due to
* invalid index stored in array
*
* Written by the kernel, shouldn't be modified by the
* application (i.e. get number of "new events" by comparing to
* cached value).
*
* After a new SQ head value was read by the application this
* counter includes all submissions that were dropped reaching
* the new SQ head (and possibly more).
*/
u32 dropped;
/*
* Runtime flags
*
* Written by the kernel, shouldn't be modified by the
* application.
*
* The application needs a full memory barrier before checking
* for IORING_SQ_NEED_WAKEUP after updating the sq tail.
*/
u32 flags;
/*
* Ring buffer of indices into array of io_uring_sqe, which is
* mmapped by the application using the IORING_OFF_SQES offset.
*
* This indirection could e.g. be used to assign fixed
* io_uring_sqe entries to operations and only submit them to
* the queue when needed.
*
* The kernel modifies neither the indices array nor the entries
* array.
*/
u32 array[];
};
/*
* This data is shared with the application through the mmap at offset
* IORING_OFF_CQ_RING.
*
* The offsets to the member fields are published through struct
* io_cqring_offsets when calling io_uring_setup.
*/
struct io_cq_ring {
/*
* Head and tail offsets into the ring; the offsets need to be
* masked to get valid indices.
*
* The application controls head and the kernel tail.
*/
struct io_uring r;
/*
* Bitmask to apply to head and tail offsets (constant, equals
* ring_entries - 1)
*/
u32 ring_mask;
/* Ring size (constant, power of 2) */
u32 ring_entries;
/*
* Number of completion events lost because the queue was full;
* this should be avoided by the application by making sure
* there are not more requests pending thatn there is space in
* the completion queue.
*
* Written by the kernel, shouldn't be modified by the
* application (i.e. get number of "new events" by comparing to
* cached value).
*
* As completion events come in out of order this counter is not
* ordered with any other data.
*/
u32 overflow;
/*
* Ring buffer of completion events.
*
* The kernel writes completion events fresh every time they are
* produced, so the application is allowed to modify pending
* entries.
*/
struct io_uring_cqe cqes[];
};
......@@ -221,7 +322,7 @@ struct io_kiocb {
struct list_head list;
unsigned int flags;
refcount_t refs;
#define REQ_F_FORCE_NONBLOCK 1 /* inline submission attempt */
#define REQ_F_NOWAIT 1 /* must not punt to workers */
#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */
#define REQ_F_FIXED_FILE 4 /* ctx owns file */
#define REQ_F_SEQ_PREV 8 /* sequential with previous */
......@@ -317,12 +418,6 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)
/* order cqe stores with ring update */
smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
/*
* Write sider barrier of tail update, app has read side. See
* comment at the top of this file.
*/
smp_wmb();
if (wq_has_sleeper(&ctx->cq_wait)) {
wake_up_interruptible(&ctx->cq_wait);
kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
......@@ -336,8 +431,11 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
unsigned tail;
tail = ctx->cached_cq_tail;
/* See comment at the top of the file */
smp_rmb();
/*
* writes to the cq entry need to come after reading head; the
* control dependency is enough as we're using WRITE_ONCE to
* fill the cq entry
*/
if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
return NULL;
......@@ -774,10 +872,14 @@ static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
if (unlikely(ret))
return ret;
if (force_nonblock) {
/* don't allow async punt if RWF_NOWAIT was requested */
if (kiocb->ki_flags & IOCB_NOWAIT)
req->flags |= REQ_F_NOWAIT;
if (force_nonblock)
kiocb->ki_flags |= IOCB_NOWAIT;
req->flags |= REQ_F_FORCE_NONBLOCK;
}
if (ctx->flags & IORING_SETUP_IOPOLL) {
if (!(kiocb->ki_flags & IOCB_DIRECT) ||
!kiocb->ki_filp->f_op->iopoll)
......@@ -1436,8 +1538,7 @@ static void io_sq_wq_submit_work(struct work_struct *work)
struct sqe_submit *s = &req->submit;
const struct io_uring_sqe *sqe = s->sqe;
/* Ensure we clear previously set forced non-block flag */
req->flags &= ~REQ_F_FORCE_NONBLOCK;
/* Ensure we clear previously set non-block flag */
req->rw.ki_flags &= ~IOCB_NOWAIT;
ret = 0;
......@@ -1467,10 +1568,11 @@ static void io_sq_wq_submit_work(struct work_struct *work)
break;
cond_resched();
} while (1);
/* drop submission reference */
io_put_req(req);
}
/* drop submission reference */
io_put_req(req);
if (ret) {
io_cqring_add_event(ctx, sqe->user_data, ret, 0);
io_put_req(req);
......@@ -1623,7 +1725,7 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
goto out;
ret = __io_submit_sqe(ctx, req, s, true);
if (ret == -EAGAIN) {
if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
struct io_uring_sqe *sqe_copy;
sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
......@@ -1697,23 +1799,9 @@ static void io_commit_sqring(struct io_ring_ctx *ctx)
* write new data to them.
*/
smp_store_release(&ring->r.head, ctx->cached_sq_head);
/*
* write side barrier of head update, app has read side. See
* comment at the top of this file
*/
smp_wmb();
}
}
/*
* Undo last io_get_sqring()
*/
static void io_drop_sqring(struct io_ring_ctx *ctx)
{
ctx->cached_sq_head--;
}
/*
* Fetch an sqe, if one is available. Note that s->sqe will point to memory
* that is mapped by userspace. This means that care needs to be taken to
......@@ -1736,8 +1824,6 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
* though the application is the one updating it.
*/
head = ctx->cached_sq_head;
/* See comment at the top of this file */
smp_rmb();
/* make sure SQ entry isn't read before tail */
if (head == smp_load_acquire(&ring->r.tail))
return false;
......@@ -1753,8 +1839,6 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
/* drop invalid entries */
ctx->cached_sq_head++;
ring->dropped++;
/* See comment at the top of this file */
smp_wmb();
return false;
}
......@@ -1878,13 +1962,11 @@ static int io_sq_thread(void *data)
finish_wait(&ctx->sqo_wait, &wait);
ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
smp_wmb();
continue;
}
finish_wait(&ctx->sqo_wait, &wait);
ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
smp_wmb();
}
i = 0;
......@@ -1929,7 +2011,7 @@ static int io_sq_thread(void *data)
static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
{
struct io_submit_state state, *statep = NULL;
int i, ret = 0, submit = 0;
int i, submit = 0;
if (to_submit > IO_PLUG_THRESHOLD) {
io_submit_state_start(&state, ctx, to_submit);
......@@ -1938,6 +2020,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
for (i = 0; i < to_submit; i++) {
struct sqe_submit s;
int ret;
if (!io_get_sqring(ctx, &s))
break;
......@@ -1945,21 +2028,18 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
s.has_user = true;
s.needs_lock = false;
s.needs_fixed_file = false;
submit++;
ret = io_submit_sqe(ctx, &s, statep);
if (ret) {
io_drop_sqring(ctx);
break;
}
submit++;
if (ret)
io_cqring_add_event(ctx, s.sqe->user_data, ret, 0);
}
io_commit_sqring(ctx);
if (statep)
io_submit_state_end(statep);
return submit ? submit : ret;
return submit;
}
static unsigned io_cqring_events(struct io_cq_ring *ring)
......@@ -2240,10 +2320,6 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
mmgrab(current->mm);
ctx->sqo_mm = current->mm;
ret = -EINVAL;
if (!cpu_possible(p->sq_thread_cpu))
goto err;
if (ctx->flags & IORING_SETUP_SQPOLL) {
ret = -EPERM;
if (!capable(CAP_SYS_ADMIN))
......@@ -2254,11 +2330,11 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
ctx->sq_thread_idle = HZ;
if (p->flags & IORING_SETUP_SQ_AFF) {
int cpu;
int cpu = array_index_nospec(p->sq_thread_cpu,
nr_cpu_ids);
cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
ret = -EINVAL;
if (!cpu_possible(p->sq_thread_cpu))
if (!cpu_possible(cpu))
goto err;
ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
......@@ -2321,8 +2397,12 @@ static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
static void io_mem_free(void *ptr)
{
struct page *page = virt_to_head_page(ptr);
struct page *page;
if (!ptr)
return;
page = virt_to_head_page(ptr);
if (put_page_testzero(page))
free_compound_page(page);
}
......@@ -2363,7 +2443,7 @@ static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
if (ctx->account_mem)
io_unaccount_mem(ctx->user, imu->nr_bvecs);
kfree(imu->bvec);
kvfree(imu->bvec);
imu->nr_bvecs = 0;
}
......@@ -2455,9 +2535,9 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
if (!pages || nr_pages > got_pages) {
kfree(vmas);
kfree(pages);
pages = kmalloc_array(nr_pages, sizeof(struct page *),
pages = kvmalloc_array(nr_pages, sizeof(struct page *),
GFP_KERNEL);
vmas = kmalloc_array(nr_pages,
vmas = kvmalloc_array(nr_pages,
sizeof(struct vm_area_struct *),
GFP_KERNEL);
if (!pages || !vmas) {
......@@ -2469,7 +2549,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
got_pages = nr_pages;
}
imu->bvec = kmalloc_array(nr_pages, sizeof(struct bio_vec),
imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
GFP_KERNEL);
ret = -ENOMEM;
if (!imu->bvec) {
......@@ -2508,6 +2588,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
}
if (ctx->account_mem)
io_unaccount_mem(ctx->user, nr_pages);
kvfree(imu->bvec);
goto err;
}
......@@ -2530,12 +2611,12 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
ctx->nr_user_bufs++;
}
kfree(pages);
kfree(vmas);
kvfree(pages);
kvfree(vmas);
return 0;
err:
kfree(pages);
kfree(vmas);
kvfree(pages);
kvfree(vmas);
io_sqe_buffer_unregister(ctx);
return ret;
}
......@@ -2573,7 +2654,10 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
__poll_t mask = 0;
poll_wait(file, &ctx->cq_wait, wait);
/* See comment at the top of this file */
/*
* synchronizes with barrier from wq_has_sleeper call in
* io_commit_cqring
*/
smp_rmb();
if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
ctx->sq_ring->ring_entries)
......@@ -2687,24 +2771,12 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
mutex_lock(&ctx->uring_lock);
submitted = io_ring_submit(ctx, to_submit);
mutex_unlock(&ctx->uring_lock);
if (submitted < 0)
goto out_ctx;
}
if (flags & IORING_ENTER_GETEVENTS) {
unsigned nr_events = 0;
min_complete = min(min_complete, ctx->cq_entries);
/*
* The application could have included the 'to_submit' count
* in how many events it wanted to wait for. If we failed to
* submit the desired count, we may need to adjust the number
* of events to poll/wait for.
*/
if (submitted < to_submit)
min_complete = min_t(unsigned, submitted, min_complete);
if (ctx->flags & IORING_SETUP_IOPOLL) {
mutex_lock(&ctx->uring_lock);
ret = io_iopoll_check(ctx, &nr_events, min_complete);
......@@ -2750,17 +2822,12 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
return -EOVERFLOW;
ctx->sq_sqes = io_mem_alloc(size);
if (!ctx->sq_sqes) {
io_mem_free(ctx->sq_ring);
if (!ctx->sq_sqes)
return -ENOMEM;
}
cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
if (!cq_ring) {
io_mem_free(ctx->sq_ring);
io_mem_free(ctx->sq_sqes);
if (!cq_ring)
return -ENOMEM;
}
ctx->cq_ring = cq_ring;
cq_ring->ring_mask = p->cq_entries - 1;
......
......@@ -60,7 +60,7 @@ struct iov_iter {
static inline enum iter_type iov_iter_type(const struct iov_iter *i)
{
return i->type & ~(READ | WRITE);
return i->type & ~(READ | WRITE | ITER_BVEC_FLAG_NO_REF);
}
static inline bool iter_is_iovec(const struct iov_iter *i)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment