Commit cdc8fcb4 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'for-5.9/io_uring-20200802' of git://git.kernel.dk/linux-block

Pull io_uring updates from Jens Axboe:
 "Lots of cleanups in here, hardening the code and/or making it easier
  to read and fixing bugs, but a core feature/change too adding support
  for real async buffered reads. With the latter in place, we just need
  buffered write async support and we're done relying on kthreads for
  the fast path. In detail:

   - Cleanup how memory accounting is done on ring setup/free (Bijan)

   - sq array offset calculation fixup (Dmitry)

   - Consistently handle blocking off O_DIRECT submission path (me)

   - Support proper async buffered reads, instead of relying on kthread
     offload for that. This uses the page waitqueue to drive retries
     from task_work, like we handle poll based retry. (me)

   - IO completion optimizations (me)

   - Fix race with accounting and ring fd install (me)

   - Support EPOLLEXCLUSIVE (Jiufei)

   - Get rid of the io_kiocb unionizing, made possible by shrinking
     other bits (Pavel)

   - Completion side cleanups (Pavel)

   - Cleanup REQ_F_ flags handling, and kill off many of them (Pavel)

   - Request environment grabbing cleanups (Pavel)

   - File and socket read/write cleanups (Pavel)

   - Improve kiocb_set_rw_flags() (Pavel)

   - Tons of fixes and cleanups (Pavel)

   - IORING_SQ_NEED_WAKEUP clear fix (Xiaoguang)"

* tag 'for-5.9/io_uring-20200802' of git://git.kernel.dk/linux-block: (127 commits)
  io_uring: flip if handling after io_setup_async_rw
  fs: optimise kiocb_set_rw_flags()
  io_uring: don't touch 'ctx' after installing file descriptor
  io_uring: get rid of atomic FAA for cq_timeouts
  io_uring: consolidate *_check_overflow accounting
  io_uring: fix stalled deferred requests
  io_uring: fix racy overflow count reporting
  io_uring: deduplicate __io_complete_rw()
  io_uring: de-unionise io_kiocb
  io-wq: update hash bits
  io_uring: fix missing io_queue_linked_timeout()
  io_uring: mark ->work uninitialised after cleanup
  io_uring: deduplicate io_grab_files() calls
  io_uring: don't do opcode prep twice
  io_uring: clear IORING_SQ_NEED_WAKEUP after executing task works
  io_uring: batch put_task_struct()
  tasks: add put_task_struct_many()
  io_uring: return locked and pinned page accounting
  io_uring: don't miscount pinned memory
  io_uring: don't open-code recv kbuf managment
  ...
parents 382625d0 fa15bafb
......@@ -960,9 +960,14 @@ static noinline_for_stack bool submit_bio_checks(struct bio *bio)
{
struct request_queue *q = bio->bi_disk->queue;
blk_status_t status = BLK_STS_IOERR;
struct blk_plug *plug;
might_sleep();
plug = blk_mq_plug(q, bio);
if (plug && plug->nowait)
bio->bi_opf |= REQ_NOWAIT;
/*
* For a REQ_NOWAIT based request, return -EOPNOTSUPP
* if queue is not a request based queue.
......@@ -1802,6 +1807,7 @@ void blk_start_plug(struct blk_plug *plug)
INIT_LIST_HEAD(&plug->cb_list);
plug->rq_count = 0;
plug->multiple_queues = false;
plug->nowait = false;
/*
* Store ordering should not be needed here, since a potential
......
......@@ -1734,7 +1734,7 @@ static int blkdev_open(struct inode * inode, struct file * filp)
*/
filp->f_flags |= O_LARGEFILE;
filp->f_mode |= FMODE_NOWAIT;
filp->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;
if (filp->f_flags & O_NDELAY)
filp->f_mode |= FMODE_NDELAY;
......
......@@ -3537,7 +3537,7 @@ static loff_t btrfs_file_llseek(struct file *file, loff_t offset, int whence)
static int btrfs_file_open(struct inode *inode, struct file *filp)
{
filp->f_mode |= FMODE_NOWAIT;
filp->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;
return generic_file_open(inode, filp);
}
......
......@@ -462,6 +462,7 @@ static void io_impersonate_work(struct io_worker *worker,
io_wq_switch_mm(worker, work);
if (worker->cur_creds != work->creds)
io_wq_switch_creds(worker, work);
current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->fsize;
}
static void io_assign_current_work(struct io_worker *worker,
......@@ -489,7 +490,6 @@ static void io_worker_handle_work(struct io_worker *worker)
do {
struct io_wq_work *work;
unsigned int hash;
get_next:
/*
* If we got some work, mark us as busy. If we didn't, but
......@@ -512,6 +512,7 @@ static void io_worker_handle_work(struct io_worker *worker)
/* handle a whole dependent link */
do {
struct io_wq_work *old_work, *next_hashed, *linked;
unsigned int hash = io_get_work_hash(work);
next_hashed = wq_next_work(work);
io_impersonate_work(worker, work);
......@@ -522,10 +523,8 @@ static void io_worker_handle_work(struct io_worker *worker)
if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
work->flags |= IO_WQ_WORK_CANCEL;
hash = io_get_work_hash(work);
linked = old_work = work;
wq->do_work(&linked);
linked = (old_work == linked) ? NULL : linked;
old_work = work;
linked = wq->do_work(work);
work = next_hashed;
if (!work && linked && !io_wq_is_hashed(linked)) {
......@@ -542,8 +541,6 @@ static void io_worker_handle_work(struct io_worker *worker)
spin_lock_irq(&wqe->lock);
wqe->hash_map &= ~BIT_ULL(hash);
wqe->flags &= ~IO_WQE_FLAG_STALLED;
/* dependent work is not hashed */
hash = -1U;
/* skip unnecessary unlock-lock wqe->lock */
if (!work)
goto get_next;
......@@ -781,8 +778,7 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
struct io_wq_work *old_work = work;
work->flags |= IO_WQ_WORK_CANCEL;
wq->do_work(&work);
work = (work == old_work) ? NULL : work;
work = wq->do_work(work);
wq->free_work(old_work);
} while (work);
}
......
......@@ -5,10 +5,10 @@ struct io_wq;
enum {
IO_WQ_WORK_CANCEL = 1,
IO_WQ_WORK_HASHED = 4,
IO_WQ_WORK_UNBOUND = 32,
IO_WQ_WORK_NO_CANCEL = 256,
IO_WQ_WORK_CONCURRENT = 512,
IO_WQ_WORK_HASHED = 2,
IO_WQ_WORK_UNBOUND = 4,
IO_WQ_WORK_NO_CANCEL = 8,
IO_WQ_WORK_CONCURRENT = 16,
IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */
};
......@@ -89,6 +89,7 @@ struct io_wq_work {
struct mm_struct *mm;
const struct cred *creds;
struct fs_struct *fs;
unsigned long fsize;
unsigned flags;
};
......@@ -101,7 +102,7 @@ static inline struct io_wq_work *wq_next_work(struct io_wq_work *work)
}
typedef void (free_work_fn)(struct io_wq_work *);
typedef void (io_wq_work_fn)(struct io_wq_work **);
typedef struct io_wq_work *(io_wq_work_fn)(struct io_wq_work *);
struct io_wq_data {
struct user_struct *user;
......
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -1080,7 +1080,7 @@ xfs_file_open(
return -EFBIG;
if (XFS_FORCED_SHUTDOWN(XFS_M(inode->i_sb)))
return -EIO;
file->f_mode |= FMODE_NOWAIT;
file->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;
return 0;
}
......
......@@ -1180,6 +1180,7 @@ struct blk_plug {
struct list_head cb_list; /* md requires an unplug callback */
unsigned short rq_count;
bool multiple_queues;
bool nowait;
};
#define BLK_MAX_REQUEST_COUNT 16
#define BLK_PLUG_FLUSH_SIZE (128 * 1024)
......
......@@ -175,6 +175,9 @@ typedef int (dio_iodone_t)(struct kiocb *iocb, loff_t offset,
/* File does not contribute to nr_files count */
#define FMODE_NOACCOUNT ((__force fmode_t)0x20000000)
/* File supports async buffered reads */
#define FMODE_BUF_RASYNC ((__force fmode_t)0x40000000)
/*
* Flag for rw_copy_check_uvector and compat_rw_copy_check_uvector
* that indicates that they should check the contents of the iovec are
......@@ -315,6 +318,8 @@ enum rw_hint {
#define IOCB_SYNC (1 << 5)
#define IOCB_WRITE (1 << 6)
#define IOCB_NOWAIT (1 << 7)
/* iocb->ki_waitq is valid */
#define IOCB_WAITQ (1 << 8)
#define IOCB_NOIO (1 << 9)
struct kiocb {
......@@ -329,7 +334,10 @@ struct kiocb {
int ki_flags;
u16 ki_hint;
u16 ki_ioprio; /* See linux/ioprio.h */
unsigned int ki_cookie; /* for ->iopoll */
union {
unsigned int ki_cookie; /* for ->iopoll */
struct wait_page_queue *ki_waitq; /* for async buffered IO */
};
randomized_struct_fields_end
};
......@@ -3275,22 +3283,28 @@ static inline int iocb_flags(struct file *file)
static inline int kiocb_set_rw_flags(struct kiocb *ki, rwf_t flags)
{
int kiocb_flags = 0;
if (!flags)
return 0;
if (unlikely(flags & ~RWF_SUPPORTED))
return -EOPNOTSUPP;
if (flags & RWF_NOWAIT) {
if (!(ki->ki_filp->f_mode & FMODE_NOWAIT))
return -EOPNOTSUPP;
ki->ki_flags |= IOCB_NOWAIT;
kiocb_flags |= IOCB_NOWAIT;
}
if (flags & RWF_HIPRI)
ki->ki_flags |= IOCB_HIPRI;
kiocb_flags |= IOCB_HIPRI;
if (flags & RWF_DSYNC)
ki->ki_flags |= IOCB_DSYNC;
kiocb_flags |= IOCB_DSYNC;
if (flags & RWF_SYNC)
ki->ki_flags |= (IOCB_DSYNC | IOCB_SYNC);
kiocb_flags |= (IOCB_DSYNC | IOCB_SYNC);
if (flags & RWF_APPEND)
ki->ki_flags |= IOCB_APPEND;
kiocb_flags |= IOCB_APPEND;
ki->ki_flags |= kiocb_flags;
return 0;
}
......
......@@ -496,8 +496,35 @@ static inline pgoff_t linear_page_index(struct vm_area_struct *vma,
return pgoff;
}
/* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */
struct wait_page_key {
struct page *page;
int bit_nr;
int page_match;
};
struct wait_page_queue {
struct page *page;
int bit_nr;
wait_queue_entry_t wait;
};
static inline bool wake_page_match(struct wait_page_queue *wait_page,
struct wait_page_key *key)
{
if (wait_page->page != key->page)
return false;
key->page_match = 1;
if (wait_page->bit_nr != key->bit_nr)
return false;
return true;
}
extern void __lock_page(struct page *page);
extern int __lock_page_killable(struct page *page);
extern int __lock_page_async(struct page *page, struct wait_page_queue *wait);
extern int __lock_page_or_retry(struct page *page, struct mm_struct *mm,
unsigned int flags);
extern void unlock_page(struct page *page);
......@@ -534,6 +561,22 @@ static inline int lock_page_killable(struct page *page)
return 0;
}
/*
* lock_page_async - Lock the page, unless this would block. If the page
* is already locked, then queue a callback when the page becomes unlocked.
* This callback can then retry the operation.
*
* Returns 0 if the page is locked successfully, or -EIOCBQUEUED if the page
* was already locked and the callback defined in 'wait' was queued.
*/
static inline int lock_page_async(struct page *page,
struct wait_page_queue *wait)
{
if (!trylock_page(page))
return __lock_page_async(page, wait);
return 0;
}
/*
* lock_page_or_retry - Lock the page, unless this would block and the
* caller indicated that it can handle a retry.
......
......@@ -126,6 +126,12 @@ static inline void put_task_struct(struct task_struct *t)
__put_task_struct(t);
}
static inline void put_task_struct_many(struct task_struct *t, int nr)
{
if (refcount_sub_and_test(nr, &t->usage))
__put_task_struct(t);
}
void put_task_struct_rcu_user(struct task_struct *task);
#ifdef CONFIG_ARCH_WANTS_DYNAMIC_TASK_STRUCT
......
......@@ -31,7 +31,8 @@ struct io_uring_sqe {
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
......@@ -249,6 +250,7 @@ struct io_uring_params {
#define IORING_FEAT_RW_CUR_POS (1U << 3)
#define IORING_FEAT_CUR_PERSONALITY (1U << 4)
#define IORING_FEAT_FAST_POLL (1U << 5)
#define IORING_FEAT_POLL_32BITS (1U << 6)
/*
* io_uring_register(2) opcodes and arguments
......
......@@ -987,19 +987,6 @@ void __init pagecache_init(void)
page_writeback_init();
}
/* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */
struct wait_page_key {
struct page *page;
int bit_nr;
int page_match;
};
struct wait_page_queue {
struct page *page;
int bit_nr;
wait_queue_entry_t wait;
};
static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg)
{
int ret;
......@@ -1007,11 +994,7 @@ static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync,
struct wait_page_queue *wait_page
= container_of(wait, struct wait_page_queue, wait);
if (wait_page->page != key->page)
return 0;
key->page_match = 1;
if (wait_page->bit_nr != key->bit_nr)
if (!wake_page_match(wait_page, key))
return 0;
/*
......@@ -1240,6 +1223,44 @@ int wait_on_page_bit_killable(struct page *page, int bit_nr)
}
EXPORT_SYMBOL(wait_on_page_bit_killable);
static int __wait_on_page_locked_async(struct page *page,
struct wait_page_queue *wait, bool set)
{
struct wait_queue_head *q = page_waitqueue(page);
int ret = 0;
wait->page = page;
wait->bit_nr = PG_locked;
spin_lock_irq(&q->lock);
__add_wait_queue_entry_tail(q, &wait->wait);
SetPageWaiters(page);
if (set)
ret = !trylock_page(page);
else
ret = PageLocked(page);
/*
* If we were succesful now, we know we're still on the
* waitqueue as we're still under the lock. This means it's
* safe to remove and return success, we know the callback
* isn't going to trigger.
*/
if (!ret)
__remove_wait_queue(q, &wait->wait);
else
ret = -EIOCBQUEUED;
spin_unlock_irq(&q->lock);
return ret;
}
static int wait_on_page_locked_async(struct page *page,
struct wait_page_queue *wait)
{
if (!PageLocked(page))
return 0;
return __wait_on_page_locked_async(compound_head(page), wait, false);
}
/**
* put_and_wait_on_page_locked - Drop a reference and wait for it to be unlocked
* @page: The page to wait for.
......@@ -1402,6 +1423,11 @@ int __lock_page_killable(struct page *__page)
}
EXPORT_SYMBOL_GPL(__lock_page_killable);
int __lock_page_async(struct page *page, struct wait_page_queue *wait)
{
return __wait_on_page_locked_async(page, wait, true);
}
/*
* Return values:
* 1 - page is locked; mmap_lock is still held.
......@@ -2061,7 +2087,7 @@ ssize_t generic_file_buffered_read(struct kiocb *iocb,
page = find_get_page(mapping, index);
if (!page) {
if (iocb->ki_flags & (IOCB_NOWAIT | IOCB_NOIO))
if (iocb->ki_flags & IOCB_NOIO)
goto would_block;
page_cache_sync_readahead(mapping,
ra, filp,
......@@ -2080,17 +2106,25 @@ ssize_t generic_file_buffered_read(struct kiocb *iocb,
index, last_index - index);
}
if (!PageUptodate(page)) {
if (iocb->ki_flags & IOCB_NOWAIT) {
put_page(page);
goto would_block;
}
/*
* See comment in do_read_cache_page on why
* wait_on_page_locked is used to avoid unnecessarily
* serialisations and why it's safe.
*/
error = wait_on_page_locked_killable(page);
if (iocb->ki_flags & IOCB_WAITQ) {
if (written) {
put_page(page);
goto out;
}
error = wait_on_page_locked_async(page,
iocb->ki_waitq);
} else {
if (iocb->ki_flags & IOCB_NOWAIT) {
put_page(page);
goto would_block;
}
error = wait_on_page_locked_killable(page);
}
if (unlikely(error))
goto readpage_error;
if (PageUptodate(page))
......@@ -2178,7 +2212,10 @@ ssize_t generic_file_buffered_read(struct kiocb *iocb,
page_not_up_to_date:
/* Get exclusive access to the page ... */
error = lock_page_killable(page);
if (iocb->ki_flags & IOCB_WAITQ)
error = lock_page_async(page, iocb->ki_waitq);
else
error = lock_page_killable(page);
if (unlikely(error))
goto readpage_error;
......@@ -2197,7 +2234,7 @@ ssize_t generic_file_buffered_read(struct kiocb *iocb,
}
readpage:
if (iocb->ki_flags & IOCB_NOIO) {
if (iocb->ki_flags & (IOCB_NOIO | IOCB_NOWAIT)) {
unlock_page(page);
put_page(page);
goto would_block;
......
......@@ -10,6 +10,7 @@ extern "C" {
#include <string.h>
#include "../../include/uapi/linux/io_uring.h"
#include <inttypes.h>
#include <linux/swab.h>
#include "barrier.h"
/*
......@@ -145,11 +146,14 @@ static inline void io_uring_prep_write_fixed(struct io_uring_sqe *sqe, int fd,
}
static inline void io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd,
short poll_mask)
unsigned poll_mask)
{
memset(sqe, 0, sizeof(*sqe));
sqe->opcode = IORING_OP_POLL_ADD;
sqe->fd = fd;
#if __BYTE_ORDER == __BIG_ENDIAN
poll_mask = __swahw32(poll_mask);
#endif
sqe->poll_events = poll_mask;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment