Commit c22c8389 authored by Andrew Morton's avatar Andrew Morton Committed by Trond Myklebust

[PATCH] AIO support for raw/O_DIRECT

Patch from Badari Pulavarty <pbadari@us.ibm.com> and myself.

This patch adds AIO support for direct-IO - O_DIRECT files, O_DIRECT
blockdevs and /dev/raw/rawN.

Also, the controlling `struct dio' is now always kmalloced, saving ~400 bytes
of stack.

The best tool for testing AIO/DIO is the modified fsx-linux in ext3 CVS.
See http://www.zip.com.au/~akpm/linux/ext3/ for details.  For example:

	fsx-linux -Z -A -R -W -r 512 -w 4096 foo

Running many instances of this against ext3 currently fails with incorrect
file data; some bug in the new ext3 O_DIRECT support.  ext2 is OK though.
parent 511d2652
......@@ -7,12 +7,21 @@
*
* 04Jul2002 akpm@zip.com.au
* Initial version
* 11Sep2002 janetinc@us.ibm.com
* added readv/writev support.
* 29Oct2002 akpm@zip.com.au
* rewrote bio_add_page() support.
* 30Oct2002 pbadari@us.ibm.com
* added support for non-aligned IO.
* 06Nov2002 pbadari@us.ibm.com
* added asynchronous IO support.
*/
#include <linux/kernel.h>
#include <linux/types.h>
#include <linux/fs.h>
#include <linux/mm.h>
#include <linux/slab.h>
#include <linux/highmem.h>
#include <linux/pagemap.h>
#include <linux/bio.h>
......@@ -101,6 +110,11 @@ struct dio {
spinlock_t bio_list_lock; /* protects bio_list */
struct bio *bio_list; /* singly linked via bi_private */
struct task_struct *waiter; /* waiting task (NULL if none) */
/* AIO related stuff */
struct kiocb *iocb; /* kiocb */
int is_async; /* is IO async ? */
int result; /* IO result */
};
/*
......@@ -177,6 +191,36 @@ static struct page *dio_get_page(struct dio *dio)
return dio->pages[dio->head++];
}
/*
* Called when a BIO has been processed. If the count goes to zero then IO is
* complete and we can signal this to the AIO layer.
*/
static void finished_one_bio(struct dio *dio)
{
if (atomic_dec_and_test(&dio->bio_count)) {
if(dio->is_async) {
aio_complete(dio->iocb, dio->result, 0);
kfree(dio);
}
}
}
static int dio_bio_complete(struct dio *dio, struct bio *bio);
/*
* Asynchronous IO callback.
*/
static int dio_bio_end_aio(struct bio *bio, unsigned int bytes_done, int error)
{
struct dio *dio = bio->bi_private;
if (bio->bi_size)
return 1;
/* cleanup the bio */
dio_bio_complete(dio, bio);
return 0;
}
/*
* The BIO completion handler simply queues the BIO up for the process-context
* handler.
......@@ -213,18 +257,28 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
bio->bi_bdev = bdev;
bio->bi_sector = first_sector;
if (dio->is_async)
bio->bi_end_io = dio_bio_end_aio;
else
bio->bi_end_io = dio_bio_end_io;
dio->bio = bio;
return 0;
}
/*
* In the AIO read case we speculatively dirty the pages before starting IO.
* During IO completion, any of these pages which happen to have been written
* back will be redirtied by bio_check_pages_dirty().
*/
static void dio_bio_submit(struct dio *dio)
{
struct bio *bio = dio->bio;
bio->bi_private = dio;
atomic_inc(&dio->bio_count);
if (dio->is_async && dio->rw == READ)
bio_set_pages_dirty(bio);
submit_bio(dio->rw, bio);
dio->bio = NULL;
......@@ -276,6 +330,12 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
struct bio_vec *bvec = bio->bi_io_vec;
int page_no;
if (!uptodate)
dio->result = -EIO;
if (dio->is_async && dio->rw == READ) {
bio_check_pages_dirty(bio); /* transfers ownership */
} else {
for (page_no = 0; page_no < bio->bi_vcnt; page_no++) {
struct page *page = bvec[page_no].bv_page;
......@@ -283,8 +343,9 @@ static int dio_bio_complete(struct dio *dio, struct bio *bio)
set_page_dirty(page);
page_cache_release(page);
}
atomic_dec(&dio->bio_count);
bio_put(bio);
}
finished_one_bio(dio);
return uptodate ? 0 : -EIO;
}
......@@ -393,7 +454,7 @@ static int get_more_blocks(struct dio *dio)
/*
* There is no bio. Make one now.
*/
static int dio_new_bio(struct dio *dio, sector_t blkno)
static int dio_new_bio(struct dio *dio, sector_t start_sector)
{
sector_t sector;
int ret, nr_pages;
......@@ -401,7 +462,7 @@ static int dio_new_bio(struct dio *dio, sector_t blkno)
ret = dio_bio_reap(dio);
if (ret)
goto out;
sector = blkno << (dio->blkbits - 9);
sector = start_sector << (dio->blkbits - 9);
nr_pages = min(dio->pages_in_io, bio_get_nr_vecs(dio->map_bh.b_bdev));
BUG_ON(nr_pages <= 0);
ret = dio_bio_alloc(dio, dio->map_bh.b_bdev, sector, nr_pages);
......@@ -750,73 +811,91 @@ static int do_direct_IO(struct dio *dio)
}
static int
direct_io_worker(int rw, struct inode *inode, const struct iovec *iov,
loff_t offset, unsigned long nr_segs, unsigned blkbits,
get_blocks_t get_blocks)
direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
const struct iovec *iov, loff_t offset, unsigned long nr_segs,
unsigned blkbits, get_blocks_t get_blocks)
{
unsigned long user_addr;
int seg, ret2, ret = 0;
struct dio dio;
size_t bytes, tot_bytes = 0;
int seg;
int ret = 0;
int ret2;
struct dio *dio;
size_t bytes;
dio.bio = NULL;
dio.inode = inode;
dio.rw = rw;
dio.blkbits = blkbits;
dio.blkfactor = inode->i_blkbits - blkbits;
dio.start_zero_done = 0;
dio.block_in_file = offset >> blkbits;
dio.blocks_available = 0;
dio = kmalloc(sizeof(*dio), GFP_KERNEL);
if (!dio)
return -ENOMEM;
dio->is_async = !is_sync_kiocb(iocb);
dio.cur_page = NULL;
dio->bio = NULL;
dio->inode = inode;
dio->rw = rw;
dio->blkbits = blkbits;
dio->blkfactor = inode->i_blkbits - blkbits;
dio->start_zero_done = 0;
dio->block_in_file = offset >> blkbits;
dio->blocks_available = 0;
dio.boundary = 0;
dio.reap_counter = 0;
dio.get_blocks = get_blocks;
dio.final_block_in_bio = -1;
dio.next_block_for_io = -1;
dio->cur_page = NULL;
dio.page_errors = 0;
dio->boundary = 0;
dio->reap_counter = 0;
dio->get_blocks = get_blocks;
dio->final_block_in_bio = -1;
dio->next_block_for_io = -1;
/* BIO completion state */
atomic_set(&dio.bio_count, 0);
spin_lock_init(&dio.bio_list_lock);
dio.bio_list = NULL;
dio.waiter = NULL;
dio.pages_in_io = 0;
dio->page_errors = 0;
dio->result = 0;
dio->iocb = iocb;
/*
* BIO completion state.
*
* ->bio_count starts out at one, and we decrement it to zero after all
* BIOs are submitted. This to avoid the situation where a really fast
* (or synchronous) device could take the count to zero while we're
* still submitting BIOs.
*/
atomic_set(&dio->bio_count, 1);
spin_lock_init(&dio->bio_list_lock);
dio->bio_list = NULL;
dio->waiter = NULL;
dio->pages_in_io = 0;
for (seg = 0; seg < nr_segs; seg++)
dio.pages_in_io += (iov[seg].iov_len >> blkbits) + 2;
dio->pages_in_io += (iov[seg].iov_len >> blkbits) + 2;
for (seg = 0; seg < nr_segs; seg++) {
user_addr = (unsigned long)iov[seg].iov_base;
bytes = iov[seg].iov_len;
/* Index into the first page of the first block */
dio.first_block_in_page = (user_addr & (PAGE_SIZE - 1)) >> blkbits;
dio.final_block_in_request = dio.block_in_file + (bytes >> blkbits);
dio->first_block_in_page = (user_addr & ~PAGE_MASK) >> blkbits;
dio->final_block_in_request = dio->block_in_file +
(bytes >> blkbits);
/* Page fetching state */
dio.head = 0;
dio.tail = 0;
dio.curr_page = 0;
dio->head = 0;
dio->tail = 0;
dio->curr_page = 0;
dio.total_pages = 0;
dio->total_pages = 0;
if (user_addr & (PAGE_SIZE-1)) {
dio.total_pages++;
dio->total_pages++;
bytes -= PAGE_SIZE - (user_addr & (PAGE_SIZE - 1));
}
dio.total_pages += (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
dio.curr_user_address = user_addr;
dio->total_pages += (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
dio->curr_user_address = user_addr;
ret = do_direct_IO(&dio);
ret = do_direct_IO(dio);
if (ret) {
dio_cleanup(&dio);
dio_cleanup(dio);
break;
}
tot_bytes += iov[seg].iov_len - ((dio.final_block_in_request -
dio.block_in_file) << blkbits);
dio->result += iov[seg].iov_len -
((dio->final_block_in_request - dio->block_in_file) <<
blkbits);
} /* end iovec loop */
......@@ -824,21 +903,42 @@ direct_io_worker(int rw, struct inode *inode, const struct iovec *iov,
* There may be some unwritten disk at the end of a part-written
* fs-block-sized block. Go zero that now.
*/
dio_zero_block(&dio, 1);
dio_zero_block(dio, 1);
if (dio.cur_page) {
ret2 = dio_send_cur_page(&dio);
page_cache_release(dio.cur_page);
if (dio->cur_page) {
ret2 = dio_send_cur_page(dio);
if (ret == 0)
ret = ret2;
page_cache_release(dio->cur_page);
dio->cur_page = NULL;
}
ret2 = dio_await_completion(&dio);
if (dio->bio)
dio_bio_submit(dio);
/*
* OK, all BIOs are submitted, so we can decrement bio_count to truly
* reflect the number of to-be-processed BIOs.
*/
if (dio->is_async) {
if (ret == 0)
ret = dio->result; /* Bytes written */
finished_one_bio(dio); /* This can free the dio */
blk_run_queues();
goto out;
}
finished_one_bio(dio);
ret2 = dio_await_completion(dio);
if (ret == 0)
ret = ret2;
if (ret == 0)
ret = dio.page_errors;
ret = dio->page_errors;
return tot_bytes ? tot_bytes : ret;
if (dio->result)
ret = dio->result;
kfree(dio);
out:
return ret;
}
/*
......@@ -881,7 +981,7 @@ blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode,
}
}
retval = direct_io_worker(rw, inode, iov, offset,
retval = direct_io_worker(rw, iocb, inode, iov, offset,
nr_segs, blkbits, get_blocks);
out:
return retval;
......
......@@ -807,6 +807,8 @@ __generic_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
}
retval = generic_file_direct_IO(READ, iocb,
iov, pos, nr_segs);
if (retval >= 0 && !is_sync_kiocb(iocb))
retval = -EIOCBQUEUED;
if (retval > 0)
*ppos = pos + retval;
}
......@@ -1691,6 +1693,8 @@ generic_file_aio_write_nolock(struct kiocb *iocb, const struct iovec *iov,
*/
if (written >= 0 && file->f_flags & O_SYNC)
status = generic_osync_inode(inode, OSYNC_METADATA);
if (written >= 0 && !is_sync_kiocb(iocb))
written = -EIOCBQUEUED;
goto out_status;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment