Commit a26ea93a authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mszeredi/fuse

Pull fuse updates from Miklos Szeredi:
 "This contains two patchsets from Maxim Patlasov.

  The first reworks the request throttling so that only async requests
  are throttled.  Wakeup of waiting async requests is also optimized.

  The second series adds support for async processing of direct IO which
  optimizes direct IO and enables the use of the AIO userspace
  interface."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mszeredi/fuse:
  fuse: add flag to turn on async direct IO
  fuse: truncate file if async dio failed
  fuse: optimize short direct reads
  fuse: enable asynchronous processing direct IO
  fuse: make fuse_direct_io() aware about AIO
  fuse: add support of async IO
  fuse: move fuse_release_user_pages() up
  fuse: optimize wake_up
  fuse: implement exclusive wakeup for blocked_waitq
  fuse: skip blocking on allocations of synchronous requests
  fuse: add flag fc->initialized
  fuse: make request allocations for background processing explicit
parents c818c778 60b9df7a
...@@ -92,8 +92,9 @@ static ssize_t cuse_read(struct file *file, char __user *buf, size_t count, ...@@ -92,8 +92,9 @@ static ssize_t cuse_read(struct file *file, char __user *buf, size_t count,
{ {
loff_t pos = 0; loff_t pos = 0;
struct iovec iov = { .iov_base = buf, .iov_len = count }; struct iovec iov = { .iov_base = buf, .iov_len = count };
struct fuse_io_priv io = { .async = 0, .file = file };
return fuse_direct_io(file, &iov, 1, count, &pos, 0); return fuse_direct_io(&io, &iov, 1, count, &pos, 0);
} }
static ssize_t cuse_write(struct file *file, const char __user *buf, static ssize_t cuse_write(struct file *file, const char __user *buf,
...@@ -101,12 +102,13 @@ static ssize_t cuse_write(struct file *file, const char __user *buf, ...@@ -101,12 +102,13 @@ static ssize_t cuse_write(struct file *file, const char __user *buf,
{ {
loff_t pos = 0; loff_t pos = 0;
struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count };
struct fuse_io_priv io = { .async = 0, .file = file };
/* /*
* No locking or generic_write_checks(), the server is * No locking or generic_write_checks(), the server is
* responsible for locking and sanity checks. * responsible for locking and sanity checks.
*/ */
return fuse_direct_io(file, &iov, 1, count, &pos, 1); return fuse_direct_io(&io, &iov, 1, count, &pos, 1);
} }
static int cuse_open(struct inode *inode, struct file *file) static int cuse_open(struct inode *inode, struct file *file)
...@@ -422,7 +424,7 @@ static int cuse_send_init(struct cuse_conn *cc) ...@@ -422,7 +424,7 @@ static int cuse_send_init(struct cuse_conn *cc)
BUILD_BUG_ON(CUSE_INIT_INFO_MAX > PAGE_SIZE); BUILD_BUG_ON(CUSE_INIT_INFO_MAX > PAGE_SIZE);
req = fuse_get_req(fc, 1); req = fuse_get_req_for_background(fc, 1);
if (IS_ERR(req)) { if (IS_ERR(req)) {
rc = PTR_ERR(req); rc = PTR_ERR(req);
goto err; goto err;
...@@ -504,7 +506,7 @@ static int cuse_channel_open(struct inode *inode, struct file *file) ...@@ -504,7 +506,7 @@ static int cuse_channel_open(struct inode *inode, struct file *file)
cc->fc.release = cuse_fc_release; cc->fc.release = cuse_fc_release;
cc->fc.connected = 1; cc->fc.connected = 1;
cc->fc.blocked = 0; cc->fc.initialized = 1;
rc = cuse_send_init(cc); rc = cuse_send_init(cc);
if (rc) { if (rc) {
fuse_conn_put(&cc->fc); fuse_conn_put(&cc->fc);
......
...@@ -111,7 +111,7 @@ static void restore_sigs(sigset_t *oldset) ...@@ -111,7 +111,7 @@ static void restore_sigs(sigset_t *oldset)
sigprocmask(SIG_SETMASK, oldset, NULL); sigprocmask(SIG_SETMASK, oldset, NULL);
} }
static void __fuse_get_request(struct fuse_req *req) void __fuse_get_request(struct fuse_req *req)
{ {
atomic_inc(&req->count); atomic_inc(&req->count);
} }
...@@ -130,20 +130,30 @@ static void fuse_req_init_context(struct fuse_req *req) ...@@ -130,20 +130,30 @@ static void fuse_req_init_context(struct fuse_req *req)
req->in.h.pid = current->pid; req->in.h.pid = current->pid;
} }
struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages) static bool fuse_block_alloc(struct fuse_conn *fc, bool for_background)
{
return !fc->initialized || (for_background && fc->blocked);
}
static struct fuse_req *__fuse_get_req(struct fuse_conn *fc, unsigned npages,
bool for_background)
{ {
struct fuse_req *req; struct fuse_req *req;
int err;
atomic_inc(&fc->num_waiting);
if (fuse_block_alloc(fc, for_background)) {
sigset_t oldset; sigset_t oldset;
int intr; int intr;
int err;
atomic_inc(&fc->num_waiting);
block_sigs(&oldset); block_sigs(&oldset);
intr = wait_event_interruptible(fc->blocked_waitq, !fc->blocked); intr = wait_event_interruptible_exclusive(fc->blocked_waitq,
!fuse_block_alloc(fc, for_background));
restore_sigs(&oldset); restore_sigs(&oldset);
err = -EINTR; err = -EINTR;
if (intr) if (intr)
goto out; goto out;
}
err = -ENOTCONN; err = -ENOTCONN;
if (!fc->connected) if (!fc->connected)
...@@ -151,19 +161,35 @@ struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages) ...@@ -151,19 +161,35 @@ struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
req = fuse_request_alloc(npages); req = fuse_request_alloc(npages);
err = -ENOMEM; err = -ENOMEM;
if (!req) if (!req) {
if (for_background)
wake_up(&fc->blocked_waitq);
goto out; goto out;
}
fuse_req_init_context(req); fuse_req_init_context(req);
req->waiting = 1; req->waiting = 1;
req->background = for_background;
return req; return req;
out: out:
atomic_dec(&fc->num_waiting); atomic_dec(&fc->num_waiting);
return ERR_PTR(err); return ERR_PTR(err);
} }
struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
{
return __fuse_get_req(fc, npages, false);
}
EXPORT_SYMBOL_GPL(fuse_get_req); EXPORT_SYMBOL_GPL(fuse_get_req);
struct fuse_req *fuse_get_req_for_background(struct fuse_conn *fc,
unsigned npages)
{
return __fuse_get_req(fc, npages, true);
}
EXPORT_SYMBOL_GPL(fuse_get_req_for_background);
/* /*
* Return request in fuse_file->reserved_req. However that may * Return request in fuse_file->reserved_req. However that may
* currently be in use. If that is the case, wait for it to become * currently be in use. If that is the case, wait for it to become
...@@ -225,19 +251,31 @@ struct fuse_req *fuse_get_req_nofail_nopages(struct fuse_conn *fc, ...@@ -225,19 +251,31 @@ struct fuse_req *fuse_get_req_nofail_nopages(struct fuse_conn *fc,
struct fuse_req *req; struct fuse_req *req;
atomic_inc(&fc->num_waiting); atomic_inc(&fc->num_waiting);
wait_event(fc->blocked_waitq, !fc->blocked); wait_event(fc->blocked_waitq, fc->initialized);
req = fuse_request_alloc(0); req = fuse_request_alloc(0);
if (!req) if (!req)
req = get_reserved_req(fc, file); req = get_reserved_req(fc, file);
fuse_req_init_context(req); fuse_req_init_context(req);
req->waiting = 1; req->waiting = 1;
req->background = 0;
return req; return req;
} }
void fuse_put_request(struct fuse_conn *fc, struct fuse_req *req) void fuse_put_request(struct fuse_conn *fc, struct fuse_req *req)
{ {
if (atomic_dec_and_test(&req->count)) { if (atomic_dec_and_test(&req->count)) {
if (unlikely(req->background)) {
/*
* We get here in the unlikely case that a background
* request was allocated but not sent
*/
spin_lock(&fc->lock);
if (!fc->blocked)
wake_up(&fc->blocked_waitq);
spin_unlock(&fc->lock);
}
if (req->waiting) if (req->waiting)
atomic_dec(&fc->num_waiting); atomic_dec(&fc->num_waiting);
...@@ -335,10 +373,15 @@ __releases(fc->lock) ...@@ -335,10 +373,15 @@ __releases(fc->lock)
list_del(&req->intr_entry); list_del(&req->intr_entry);
req->state = FUSE_REQ_FINISHED; req->state = FUSE_REQ_FINISHED;
if (req->background) { if (req->background) {
if (fc->num_background == fc->max_background) { req->background = 0;
if (fc->num_background == fc->max_background)
fc->blocked = 0; fc->blocked = 0;
wake_up_all(&fc->blocked_waitq);
} /* Wake up next waiter, if any */
if (!fc->blocked && waitqueue_active(&fc->blocked_waitq))
wake_up(&fc->blocked_waitq);
if (fc->num_background == fc->congestion_threshold && if (fc->num_background == fc->congestion_threshold &&
fc->connected && fc->bdi_initialized) { fc->connected && fc->bdi_initialized) {
clear_bdi_congested(&fc->bdi, BLK_RW_SYNC); clear_bdi_congested(&fc->bdi, BLK_RW_SYNC);
...@@ -442,6 +485,7 @@ __acquires(fc->lock) ...@@ -442,6 +485,7 @@ __acquires(fc->lock)
static void __fuse_request_send(struct fuse_conn *fc, struct fuse_req *req) static void __fuse_request_send(struct fuse_conn *fc, struct fuse_req *req)
{ {
BUG_ON(req->background);
spin_lock(&fc->lock); spin_lock(&fc->lock);
if (!fc->connected) if (!fc->connected)
req->out.h.error = -ENOTCONN; req->out.h.error = -ENOTCONN;
...@@ -469,7 +513,7 @@ EXPORT_SYMBOL_GPL(fuse_request_send); ...@@ -469,7 +513,7 @@ EXPORT_SYMBOL_GPL(fuse_request_send);
static void fuse_request_send_nowait_locked(struct fuse_conn *fc, static void fuse_request_send_nowait_locked(struct fuse_conn *fc,
struct fuse_req *req) struct fuse_req *req)
{ {
req->background = 1; BUG_ON(!req->background);
fc->num_background++; fc->num_background++;
if (fc->num_background == fc->max_background) if (fc->num_background == fc->max_background)
fc->blocked = 1; fc->blocked = 1;
...@@ -2071,6 +2115,7 @@ void fuse_abort_conn(struct fuse_conn *fc) ...@@ -2071,6 +2115,7 @@ void fuse_abort_conn(struct fuse_conn *fc)
if (fc->connected) { if (fc->connected) {
fc->connected = 0; fc->connected = 0;
fc->blocked = 0; fc->blocked = 0;
fc->initialized = 1;
end_io_requests(fc); end_io_requests(fc);
end_queued_requests(fc); end_queued_requests(fc);
end_polls(fc); end_polls(fc);
...@@ -2089,6 +2134,7 @@ int fuse_dev_release(struct inode *inode, struct file *file) ...@@ -2089,6 +2134,7 @@ int fuse_dev_release(struct inode *inode, struct file *file)
spin_lock(&fc->lock); spin_lock(&fc->lock);
fc->connected = 0; fc->connected = 0;
fc->blocked = 0; fc->blocked = 0;
fc->initialized = 1;
end_queued_requests(fc); end_queued_requests(fc);
end_polls(fc); end_polls(fc);
wake_up_all(&fc->blocked_waitq); wake_up_all(&fc->blocked_waitq);
......
...@@ -1562,10 +1562,9 @@ void fuse_release_nowrite(struct inode *inode) ...@@ -1562,10 +1562,9 @@ void fuse_release_nowrite(struct inode *inode)
* vmtruncate() doesn't allow for this case, so do the rlimit checking * vmtruncate() doesn't allow for this case, so do the rlimit checking
* and the actual truncation by hand. * and the actual truncation by hand.
*/ */
static int fuse_do_setattr(struct dentry *entry, struct iattr *attr, int fuse_do_setattr(struct inode *inode, struct iattr *attr,
struct file *file) struct file *file)
{ {
struct inode *inode = entry->d_inode;
struct fuse_conn *fc = get_fuse_conn(inode); struct fuse_conn *fc = get_fuse_conn(inode);
struct fuse_req *req; struct fuse_req *req;
struct fuse_setattr_in inarg; struct fuse_setattr_in inarg;
...@@ -1574,9 +1573,6 @@ static int fuse_do_setattr(struct dentry *entry, struct iattr *attr, ...@@ -1574,9 +1573,6 @@ static int fuse_do_setattr(struct dentry *entry, struct iattr *attr,
loff_t oldsize; loff_t oldsize;
int err; int err;
if (!fuse_allow_current_process(fc))
return -EACCES;
if (!(fc->flags & FUSE_DEFAULT_PERMISSIONS)) if (!(fc->flags & FUSE_DEFAULT_PERMISSIONS))
attr->ia_valid |= ATTR_FORCE; attr->ia_valid |= ATTR_FORCE;
...@@ -1671,10 +1667,15 @@ static int fuse_do_setattr(struct dentry *entry, struct iattr *attr, ...@@ -1671,10 +1667,15 @@ static int fuse_do_setattr(struct dentry *entry, struct iattr *attr,
static int fuse_setattr(struct dentry *entry, struct iattr *attr) static int fuse_setattr(struct dentry *entry, struct iattr *attr)
{ {
struct inode *inode = entry->d_inode;
if (!fuse_allow_current_process(get_fuse_conn(inode)))
return -EACCES;
if (attr->ia_valid & ATTR_FILE) if (attr->ia_valid & ATTR_FILE)
return fuse_do_setattr(entry, attr, attr->ia_file); return fuse_do_setattr(inode, attr, attr->ia_file);
else else
return fuse_do_setattr(entry, attr, NULL); return fuse_do_setattr(inode, attr, NULL);
} }
static int fuse_getattr(struct vfsmount *mnt, struct dentry *entry, static int fuse_getattr(struct vfsmount *mnt, struct dentry *entry,
......
...@@ -126,11 +126,13 @@ static void fuse_file_put(struct fuse_file *ff, bool sync) ...@@ -126,11 +126,13 @@ static void fuse_file_put(struct fuse_file *ff, bool sync)
struct fuse_req *req = ff->reserved_req; struct fuse_req *req = ff->reserved_req;
if (sync) { if (sync) {
req->background = 0;
fuse_request_send(ff->fc, req); fuse_request_send(ff->fc, req);
path_put(&req->misc.release.path); path_put(&req->misc.release.path);
fuse_put_request(ff->fc, req); fuse_put_request(ff->fc, req);
} else { } else {
req->end = fuse_release_end; req->end = fuse_release_end;
req->background = 1;
fuse_request_send_background(ff->fc, req); fuse_request_send_background(ff->fc, req);
} }
kfree(ff); kfree(ff);
...@@ -282,6 +284,7 @@ void fuse_sync_release(struct fuse_file *ff, int flags) ...@@ -282,6 +284,7 @@ void fuse_sync_release(struct fuse_file *ff, int flags)
WARN_ON(atomic_read(&ff->count) > 1); WARN_ON(atomic_read(&ff->count) > 1);
fuse_prepare_release(ff, flags, FUSE_RELEASE); fuse_prepare_release(ff, flags, FUSE_RELEASE);
ff->reserved_req->force = 1; ff->reserved_req->force = 1;
ff->reserved_req->background = 0;
fuse_request_send(ff->fc, ff->reserved_req); fuse_request_send(ff->fc, ff->reserved_req);
fuse_put_request(ff->fc, ff->reserved_req); fuse_put_request(ff->fc, ff->reserved_req);
kfree(ff); kfree(ff);
...@@ -491,9 +494,115 @@ void fuse_read_fill(struct fuse_req *req, struct file *file, loff_t pos, ...@@ -491,9 +494,115 @@ void fuse_read_fill(struct fuse_req *req, struct file *file, loff_t pos,
req->out.args[0].size = count; req->out.args[0].size = count;
} }
static size_t fuse_send_read(struct fuse_req *req, struct file *file, static void fuse_release_user_pages(struct fuse_req *req, int write)
{
unsigned i;
for (i = 0; i < req->num_pages; i++) {
struct page *page = req->pages[i];
if (write)
set_page_dirty_lock(page);
put_page(page);
}
}
/**
* In case of short read, the caller sets 'pos' to the position of
* actual end of fuse request in IO request. Otherwise, if bytes_requested
* == bytes_transferred or rw == WRITE, the caller sets 'pos' to -1.
*
* An example:
* User requested DIO read of 64K. It was splitted into two 32K fuse requests,
* both submitted asynchronously. The first of them was ACKed by userspace as
* fully completed (req->out.args[0].size == 32K) resulting in pos == -1. The
* second request was ACKed as short, e.g. only 1K was read, resulting in
* pos == 33K.
*
* Thus, when all fuse requests are completed, the minimal non-negative 'pos'
* will be equal to the length of the longest contiguous fragment of
* transferred data starting from the beginning of IO request.
*/
static void fuse_aio_complete(struct fuse_io_priv *io, int err, ssize_t pos)
{
int left;
spin_lock(&io->lock);
if (err)
io->err = io->err ? : err;
else if (pos >= 0 && (io->bytes < 0 || pos < io->bytes))
io->bytes = pos;
left = --io->reqs;
spin_unlock(&io->lock);
if (!left) {
long res;
if (io->err)
res = io->err;
else if (io->bytes >= 0 && io->write)
res = -EIO;
else {
res = io->bytes < 0 ? io->size : io->bytes;
if (!is_sync_kiocb(io->iocb)) {
struct path *path = &io->iocb->ki_filp->f_path;
struct inode *inode = path->dentry->d_inode;
struct fuse_conn *fc = get_fuse_conn(inode);
struct fuse_inode *fi = get_fuse_inode(inode);
spin_lock(&fc->lock);
fi->attr_version = ++fc->attr_version;
spin_unlock(&fc->lock);
}
}
aio_complete(io->iocb, res, 0);
kfree(io);
}
}
static void fuse_aio_complete_req(struct fuse_conn *fc, struct fuse_req *req)
{
struct fuse_io_priv *io = req->io;
ssize_t pos = -1;
fuse_release_user_pages(req, !io->write);
if (io->write) {
if (req->misc.write.in.size != req->misc.write.out.size)
pos = req->misc.write.in.offset - io->offset +
req->misc.write.out.size;
} else {
if (req->misc.read.in.size != req->out.args[0].size)
pos = req->misc.read.in.offset - io->offset +
req->out.args[0].size;
}
fuse_aio_complete(io, req->out.h.error, pos);
}
static size_t fuse_async_req_send(struct fuse_conn *fc, struct fuse_req *req,
size_t num_bytes, struct fuse_io_priv *io)
{
spin_lock(&io->lock);
io->size += num_bytes;
io->reqs++;
spin_unlock(&io->lock);
req->io = io;
req->end = fuse_aio_complete_req;
__fuse_get_request(req);
fuse_request_send_background(fc, req);
return num_bytes;
}
static size_t fuse_send_read(struct fuse_req *req, struct fuse_io_priv *io,
loff_t pos, size_t count, fl_owner_t owner) loff_t pos, size_t count, fl_owner_t owner)
{ {
struct file *file = io->file;
struct fuse_file *ff = file->private_data; struct fuse_file *ff = file->private_data;
struct fuse_conn *fc = ff->fc; struct fuse_conn *fc = ff->fc;
...@@ -504,6 +613,10 @@ static size_t fuse_send_read(struct fuse_req *req, struct file *file, ...@@ -504,6 +613,10 @@ static size_t fuse_send_read(struct fuse_req *req, struct file *file,
inarg->read_flags |= FUSE_READ_LOCKOWNER; inarg->read_flags |= FUSE_READ_LOCKOWNER;
inarg->lock_owner = fuse_lock_owner_id(fc, owner); inarg->lock_owner = fuse_lock_owner_id(fc, owner);
} }
if (io->async)
return fuse_async_req_send(fc, req, count, io);
fuse_request_send(fc, req); fuse_request_send(fc, req);
return req->out.args[0].size; return req->out.args[0].size;
} }
...@@ -524,6 +637,7 @@ static void fuse_read_update_size(struct inode *inode, loff_t size, ...@@ -524,6 +637,7 @@ static void fuse_read_update_size(struct inode *inode, loff_t size,
static int fuse_readpage(struct file *file, struct page *page) static int fuse_readpage(struct file *file, struct page *page)
{ {
struct fuse_io_priv io = { .async = 0, .file = file };
struct inode *inode = page->mapping->host; struct inode *inode = page->mapping->host;
struct fuse_conn *fc = get_fuse_conn(inode); struct fuse_conn *fc = get_fuse_conn(inode);
struct fuse_req *req; struct fuse_req *req;
...@@ -556,7 +670,7 @@ static int fuse_readpage(struct file *file, struct page *page) ...@@ -556,7 +670,7 @@ static int fuse_readpage(struct file *file, struct page *page)
req->num_pages = 1; req->num_pages = 1;
req->pages[0] = page; req->pages[0] = page;
req->page_descs[0].length = count; req->page_descs[0].length = count;
num_read = fuse_send_read(req, file, pos, count, NULL); num_read = fuse_send_read(req, &io, pos, count, NULL);
err = req->out.h.error; err = req->out.h.error;
fuse_put_request(fc, req); fuse_put_request(fc, req);
...@@ -661,7 +775,12 @@ static int fuse_readpages_fill(void *_data, struct page *page) ...@@ -661,7 +775,12 @@ static int fuse_readpages_fill(void *_data, struct page *page)
int nr_alloc = min_t(unsigned, data->nr_pages, int nr_alloc = min_t(unsigned, data->nr_pages,
FUSE_MAX_PAGES_PER_REQ); FUSE_MAX_PAGES_PER_REQ);
fuse_send_readpages(req, data->file); fuse_send_readpages(req, data->file);
data->req = req = fuse_get_req(fc, nr_alloc); if (fc->async_read)
req = fuse_get_req_for_background(fc, nr_alloc);
else
req = fuse_get_req(fc, nr_alloc);
data->req = req;
if (IS_ERR(req)) { if (IS_ERR(req)) {
unlock_page(page); unlock_page(page);
return PTR_ERR(req); return PTR_ERR(req);
...@@ -696,6 +815,9 @@ static int fuse_readpages(struct file *file, struct address_space *mapping, ...@@ -696,6 +815,9 @@ static int fuse_readpages(struct file *file, struct address_space *mapping,
data.file = file; data.file = file;
data.inode = inode; data.inode = inode;
if (fc->async_read)
data.req = fuse_get_req_for_background(fc, nr_alloc);
else
data.req = fuse_get_req(fc, nr_alloc); data.req = fuse_get_req(fc, nr_alloc);
data.nr_pages = nr_pages; data.nr_pages = nr_pages;
err = PTR_ERR(data.req); err = PTR_ERR(data.req);
...@@ -758,9 +880,10 @@ static void fuse_write_fill(struct fuse_req *req, struct fuse_file *ff, ...@@ -758,9 +880,10 @@ static void fuse_write_fill(struct fuse_req *req, struct fuse_file *ff,
req->out.args[0].value = outarg; req->out.args[0].value = outarg;
} }
static size_t fuse_send_write(struct fuse_req *req, struct file *file, static size_t fuse_send_write(struct fuse_req *req, struct fuse_io_priv *io,
loff_t pos, size_t count, fl_owner_t owner) loff_t pos, size_t count, fl_owner_t owner)
{ {
struct file *file = io->file;
struct fuse_file *ff = file->private_data; struct fuse_file *ff = file->private_data;
struct fuse_conn *fc = ff->fc; struct fuse_conn *fc = ff->fc;
struct fuse_write_in *inarg = &req->misc.write.in; struct fuse_write_in *inarg = &req->misc.write.in;
...@@ -771,6 +894,10 @@ static size_t fuse_send_write(struct fuse_req *req, struct file *file, ...@@ -771,6 +894,10 @@ static size_t fuse_send_write(struct fuse_req *req, struct file *file,
inarg->write_flags |= FUSE_WRITE_LOCKOWNER; inarg->write_flags |= FUSE_WRITE_LOCKOWNER;
inarg->lock_owner = fuse_lock_owner_id(fc, owner); inarg->lock_owner = fuse_lock_owner_id(fc, owner);
} }
if (io->async)
return fuse_async_req_send(fc, req, count, io);
fuse_request_send(fc, req); fuse_request_send(fc, req);
return req->misc.write.out.size; return req->misc.write.out.size;
} }
...@@ -794,11 +921,12 @@ static size_t fuse_send_write_pages(struct fuse_req *req, struct file *file, ...@@ -794,11 +921,12 @@ static size_t fuse_send_write_pages(struct fuse_req *req, struct file *file,
size_t res; size_t res;
unsigned offset; unsigned offset;
unsigned i; unsigned i;
struct fuse_io_priv io = { .async = 0, .file = file };
for (i = 0; i < req->num_pages; i++) for (i = 0; i < req->num_pages; i++)
fuse_wait_on_page_writeback(inode, req->pages[i]->index); fuse_wait_on_page_writeback(inode, req->pages[i]->index);
res = fuse_send_write(req, file, pos, count, NULL); res = fuse_send_write(req, &io, pos, count, NULL);
offset = req->page_descs[0].offset; offset = req->page_descs[0].offset;
count = res; count = res;
...@@ -1033,18 +1161,6 @@ static ssize_t fuse_file_aio_write(struct kiocb *iocb, const struct iovec *iov, ...@@ -1033,18 +1161,6 @@ static ssize_t fuse_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
return written ? written : err; return written ? written : err;
} }
static void fuse_release_user_pages(struct fuse_req *req, int write)
{
unsigned i;
for (i = 0; i < req->num_pages; i++) {
struct page *page = req->pages[i];
if (write)
set_page_dirty_lock(page);
put_page(page);
}
}
static inline void fuse_page_descs_length_init(struct fuse_req *req, static inline void fuse_page_descs_length_init(struct fuse_req *req,
unsigned index, unsigned nr_pages) unsigned index, unsigned nr_pages)
{ {
...@@ -1146,10 +1262,11 @@ static inline int fuse_iter_npages(const struct iov_iter *ii_p) ...@@ -1146,10 +1262,11 @@ static inline int fuse_iter_npages(const struct iov_iter *ii_p)
return min(npages, FUSE_MAX_PAGES_PER_REQ); return min(npages, FUSE_MAX_PAGES_PER_REQ);
} }
ssize_t fuse_direct_io(struct file *file, const struct iovec *iov, ssize_t fuse_direct_io(struct fuse_io_priv *io, const struct iovec *iov,
unsigned long nr_segs, size_t count, loff_t *ppos, unsigned long nr_segs, size_t count, loff_t *ppos,
int write) int write)
{ {
struct file *file = io->file;
struct fuse_file *ff = file->private_data; struct fuse_file *ff = file->private_data;
struct fuse_conn *fc = ff->fc; struct fuse_conn *fc = ff->fc;
size_t nmax = write ? fc->max_write : fc->max_read; size_t nmax = write ? fc->max_write : fc->max_read;
...@@ -1175,10 +1292,11 @@ ssize_t fuse_direct_io(struct file *file, const struct iovec *iov, ...@@ -1175,10 +1292,11 @@ ssize_t fuse_direct_io(struct file *file, const struct iovec *iov,
} }
if (write) if (write)
nres = fuse_send_write(req, file, pos, nbytes, owner); nres = fuse_send_write(req, io, pos, nbytes, owner);
else else
nres = fuse_send_read(req, file, pos, nbytes, owner); nres = fuse_send_read(req, io, pos, nbytes, owner);
if (!io->async)
fuse_release_user_pages(req, !write); fuse_release_user_pages(req, !write);
if (req->out.h.error) { if (req->out.h.error) {
if (!res) if (!res)
...@@ -1209,17 +1327,19 @@ ssize_t fuse_direct_io(struct file *file, const struct iovec *iov, ...@@ -1209,17 +1327,19 @@ ssize_t fuse_direct_io(struct file *file, const struct iovec *iov,
} }
EXPORT_SYMBOL_GPL(fuse_direct_io); EXPORT_SYMBOL_GPL(fuse_direct_io);
static ssize_t __fuse_direct_read(struct file *file, const struct iovec *iov, static ssize_t __fuse_direct_read(struct fuse_io_priv *io,
unsigned long nr_segs, loff_t *ppos) const struct iovec *iov,
unsigned long nr_segs, loff_t *ppos,
size_t count)
{ {
ssize_t res; ssize_t res;
struct file *file = io->file;
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
if (is_bad_inode(inode)) if (is_bad_inode(inode))
return -EIO; return -EIO;
res = fuse_direct_io(file, iov, nr_segs, iov_length(iov, nr_segs), res = fuse_direct_io(io, iov, nr_segs, count, ppos, 0);
ppos, 0);
fuse_invalidate_attr(inode); fuse_invalidate_attr(inode);
...@@ -1229,23 +1349,23 @@ static ssize_t __fuse_direct_read(struct file *file, const struct iovec *iov, ...@@ -1229,23 +1349,23 @@ static ssize_t __fuse_direct_read(struct file *file, const struct iovec *iov,
static ssize_t fuse_direct_read(struct file *file, char __user *buf, static ssize_t fuse_direct_read(struct file *file, char __user *buf,
size_t count, loff_t *ppos) size_t count, loff_t *ppos)
{ {
struct fuse_io_priv io = { .async = 0, .file = file };
struct iovec iov = { .iov_base = buf, .iov_len = count }; struct iovec iov = { .iov_base = buf, .iov_len = count };
return __fuse_direct_read(file, &iov, 1, ppos); return __fuse_direct_read(&io, &iov, 1, ppos, count);
} }
static ssize_t __fuse_direct_write(struct file *file, const struct iovec *iov, static ssize_t __fuse_direct_write(struct fuse_io_priv *io,
const struct iovec *iov,
unsigned long nr_segs, loff_t *ppos) unsigned long nr_segs, loff_t *ppos)
{ {
struct file *file = io->file;
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
size_t count = iov_length(iov, nr_segs); size_t count = iov_length(iov, nr_segs);
ssize_t res; ssize_t res;
res = generic_write_checks(file, ppos, &count, 0); res = generic_write_checks(file, ppos, &count, 0);
if (!res) { if (!res)
res = fuse_direct_io(file, iov, nr_segs, count, ppos, 1); res = fuse_direct_io(io, iov, nr_segs, count, ppos, 1);
if (res > 0)
fuse_write_update_size(inode, *ppos);
}
fuse_invalidate_attr(inode); fuse_invalidate_attr(inode);
...@@ -1258,13 +1378,16 @@ static ssize_t fuse_direct_write(struct file *file, const char __user *buf, ...@@ -1258,13 +1378,16 @@ static ssize_t fuse_direct_write(struct file *file, const char __user *buf,
struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count };
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
ssize_t res; ssize_t res;
struct fuse_io_priv io = { .async = 0, .file = file };
if (is_bad_inode(inode)) if (is_bad_inode(inode))
return -EIO; return -EIO;
/* Don't allow parallel writes to the same file */ /* Don't allow parallel writes to the same file */
mutex_lock(&inode->i_mutex); mutex_lock(&inode->i_mutex);
res = __fuse_direct_write(file, &iov, 1, ppos); res = __fuse_direct_write(&io, &iov, 1, ppos);
if (res > 0)
fuse_write_update_size(inode, *ppos);
mutex_unlock(&inode->i_mutex); mutex_unlock(&inode->i_mutex);
return res; return res;
...@@ -1373,6 +1496,7 @@ static int fuse_writepage_locked(struct page *page) ...@@ -1373,6 +1496,7 @@ static int fuse_writepage_locked(struct page *page)
if (!req) if (!req)
goto err; goto err;
req->background = 1; /* writeback always goes to bg_queue */
tmp_page = alloc_page(GFP_NOFS | __GFP_HIGHMEM); tmp_page = alloc_page(GFP_NOFS | __GFP_HIGHMEM);
if (!tmp_page) if (!tmp_page)
goto err_free; goto err_free;
...@@ -2226,21 +2350,93 @@ int fuse_notify_poll_wakeup(struct fuse_conn *fc, ...@@ -2226,21 +2350,93 @@ int fuse_notify_poll_wakeup(struct fuse_conn *fc,
return 0; return 0;
} }
static void fuse_do_truncate(struct file *file)
{
struct inode *inode = file->f_mapping->host;
struct iattr attr;
attr.ia_valid = ATTR_SIZE;
attr.ia_size = i_size_read(inode);
attr.ia_file = file;
attr.ia_valid |= ATTR_FILE;
fuse_do_setattr(inode, &attr, file);
}
static ssize_t static ssize_t
fuse_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, fuse_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov,
loff_t offset, unsigned long nr_segs) loff_t offset, unsigned long nr_segs)
{ {
ssize_t ret = 0; ssize_t ret = 0;
struct file *file = NULL; struct file *file = iocb->ki_filp;
struct fuse_file *ff = file->private_data;
loff_t pos = 0; loff_t pos = 0;
struct inode *inode;
loff_t i_size;
size_t count = iov_length(iov, nr_segs);
struct fuse_io_priv *io;
file = iocb->ki_filp;
pos = offset; pos = offset;
inode = file->f_mapping->host;
i_size = i_size_read(inode);
/* optimization for short read */
if (rw != WRITE && offset + count > i_size) {
if (offset >= i_size)
return 0;
count = i_size - offset;
}
io = kmalloc(sizeof(struct fuse_io_priv), GFP_KERNEL);
if (!io)
return -ENOMEM;
spin_lock_init(&io->lock);
io->reqs = 1;
io->bytes = -1;
io->size = 0;
io->offset = offset;
io->write = (rw == WRITE);
io->err = 0;
io->file = file;
/*
* By default, we want to optimize all I/Os with async request
* submission to the client filesystem if supported.
*/
io->async = ff->fc->async_dio;
io->iocb = iocb;
/*
* We cannot asynchronously extend the size of a file. We have no method
* to wait on real async I/O requests, so we must submit this request
* synchronously.
*/
if (!is_sync_kiocb(iocb) && (offset + count > i_size))
io->async = false;
if (rw == WRITE) if (rw == WRITE)
ret = __fuse_direct_write(file, iov, nr_segs, &pos); ret = __fuse_direct_write(io, iov, nr_segs, &pos);
else else
ret = __fuse_direct_read(file, iov, nr_segs, &pos); ret = __fuse_direct_read(io, iov, nr_segs, &pos, count);
if (io->async) {
fuse_aio_complete(io, ret < 0 ? ret : 0, -1);
/* we have a non-extending, async request, so return */
if (ret > 0 && !is_sync_kiocb(iocb))
return -EIOCBQUEUED;
ret = wait_on_sync_kiocb(iocb);
} else {
kfree(io);
}
if (rw == WRITE) {
if (ret > 0)
fuse_write_update_size(inode, pos);
else if (ret < 0 && offset + count > i_size)
fuse_do_truncate(file);
}
return ret; return ret;
} }
......
...@@ -228,6 +228,20 @@ enum fuse_req_state { ...@@ -228,6 +228,20 @@ enum fuse_req_state {
FUSE_REQ_FINISHED FUSE_REQ_FINISHED
}; };
/** The request IO state (for asynchronous processing) */
struct fuse_io_priv {
int async;
spinlock_t lock;
unsigned reqs;
ssize_t bytes;
size_t size;
__u64 offset;
bool write;
int err;
struct kiocb *iocb;
struct file *file;
};
/** /**
* A request to the client * A request to the client
*/ */
...@@ -332,6 +346,9 @@ struct fuse_req { ...@@ -332,6 +346,9 @@ struct fuse_req {
/** Inode used in the request or NULL */ /** Inode used in the request or NULL */
struct inode *inode; struct inode *inode;
/** AIO control block */
struct fuse_io_priv *io;
/** Link on fi->writepages */ /** Link on fi->writepages */
struct list_head writepages_entry; struct list_head writepages_entry;
...@@ -417,6 +434,10 @@ struct fuse_conn { ...@@ -417,6 +434,10 @@ struct fuse_conn {
/** Batching of FORGET requests (positive indicates FORGET batch) */ /** Batching of FORGET requests (positive indicates FORGET batch) */
int forget_batch; int forget_batch;
/** Flag indicating that INIT reply has been received. Allocating
* any fuse request will be suspended until the flag is set */
int initialized;
/** Flag indicating if connection is blocked. This will be /** Flag indicating if connection is blocked. This will be
the case before the INIT reply is received, and if there the case before the INIT reply is received, and if there
are too many outstading backgrounds requests */ are too many outstading backgrounds requests */
...@@ -520,6 +541,9 @@ struct fuse_conn { ...@@ -520,6 +541,9 @@ struct fuse_conn {
/** Does the filesystem want adaptive readdirplus? */ /** Does the filesystem want adaptive readdirplus? */
unsigned readdirplus_auto:1; unsigned readdirplus_auto:1;
/** Does the filesystem support asynchronous direct-IO submission? */
unsigned async_dio:1;
/** The number of requests waiting for completion */ /** The number of requests waiting for completion */
atomic_t num_waiting; atomic_t num_waiting;
...@@ -708,6 +732,13 @@ void fuse_request_free(struct fuse_req *req); ...@@ -708,6 +732,13 @@ void fuse_request_free(struct fuse_req *req);
* caller should specify # elements in req->pages[] explicitly * caller should specify # elements in req->pages[] explicitly
*/ */
struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages); struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages);
struct fuse_req *fuse_get_req_for_background(struct fuse_conn *fc,
unsigned npages);
/*
* Increment reference count on request
*/
void __fuse_get_request(struct fuse_req *req);
/** /**
* Get a request, may fail with -ENOMEM, * Get a request, may fail with -ENOMEM,
...@@ -823,7 +854,7 @@ int fuse_reverse_inval_entry(struct super_block *sb, u64 parent_nodeid, ...@@ -823,7 +854,7 @@ int fuse_reverse_inval_entry(struct super_block *sb, u64 parent_nodeid,
int fuse_do_open(struct fuse_conn *fc, u64 nodeid, struct file *file, int fuse_do_open(struct fuse_conn *fc, u64 nodeid, struct file *file,
bool isdir); bool isdir);
ssize_t fuse_direct_io(struct file *file, const struct iovec *iov, ssize_t fuse_direct_io(struct fuse_io_priv *io, const struct iovec *iov,
unsigned long nr_segs, size_t count, loff_t *ppos, unsigned long nr_segs, size_t count, loff_t *ppos,
int write); int write);
long fuse_do_ioctl(struct file *file, unsigned int cmd, unsigned long arg, long fuse_do_ioctl(struct file *file, unsigned int cmd, unsigned long arg,
...@@ -835,4 +866,7 @@ int fuse_dev_release(struct inode *inode, struct file *file); ...@@ -835,4 +866,7 @@ int fuse_dev_release(struct inode *inode, struct file *file);
void fuse_write_update_size(struct inode *inode, loff_t pos); void fuse_write_update_size(struct inode *inode, loff_t pos);
int fuse_do_setattr(struct inode *inode, struct iattr *attr,
struct file *file);
#endif /* _FS_FUSE_I_H */ #endif /* _FS_FUSE_I_H */
...@@ -346,6 +346,7 @@ static void fuse_send_destroy(struct fuse_conn *fc) ...@@ -346,6 +346,7 @@ static void fuse_send_destroy(struct fuse_conn *fc)
fc->destroy_req = NULL; fc->destroy_req = NULL;
req->in.h.opcode = FUSE_DESTROY; req->in.h.opcode = FUSE_DESTROY;
req->force = 1; req->force = 1;
req->background = 0;
fuse_request_send(fc, req); fuse_request_send(fc, req);
fuse_put_request(fc, req); fuse_put_request(fc, req);
} }
...@@ -362,6 +363,7 @@ void fuse_conn_kill(struct fuse_conn *fc) ...@@ -362,6 +363,7 @@ void fuse_conn_kill(struct fuse_conn *fc)
spin_lock(&fc->lock); spin_lock(&fc->lock);
fc->connected = 0; fc->connected = 0;
fc->blocked = 0; fc->blocked = 0;
fc->initialized = 1;
spin_unlock(&fc->lock); spin_unlock(&fc->lock);
/* Flush all readers on this fs */ /* Flush all readers on this fs */
kill_fasync(&fc->fasync, SIGIO, POLL_IN); kill_fasync(&fc->fasync, SIGIO, POLL_IN);
...@@ -581,7 +583,8 @@ void fuse_conn_init(struct fuse_conn *fc) ...@@ -581,7 +583,8 @@ void fuse_conn_init(struct fuse_conn *fc)
fc->khctr = 0; fc->khctr = 0;
fc->polled_files = RB_ROOT; fc->polled_files = RB_ROOT;
fc->reqctr = 0; fc->reqctr = 0;
fc->blocked = 1; fc->blocked = 0;
fc->initialized = 0;
fc->attr_version = 1; fc->attr_version = 1;
get_random_bytes(&fc->scramble_key, sizeof(fc->scramble_key)); get_random_bytes(&fc->scramble_key, sizeof(fc->scramble_key));
} }
...@@ -868,6 +871,8 @@ static void process_init_reply(struct fuse_conn *fc, struct fuse_req *req) ...@@ -868,6 +871,8 @@ static void process_init_reply(struct fuse_conn *fc, struct fuse_req *req)
fc->do_readdirplus = 1; fc->do_readdirplus = 1;
if (arg->flags & FUSE_READDIRPLUS_AUTO) if (arg->flags & FUSE_READDIRPLUS_AUTO)
fc->readdirplus_auto = 1; fc->readdirplus_auto = 1;
if (arg->flags & FUSE_ASYNC_DIO)
fc->async_dio = 1;
} else { } else {
ra_pages = fc->max_read / PAGE_CACHE_SIZE; ra_pages = fc->max_read / PAGE_CACHE_SIZE;
fc->no_lock = 1; fc->no_lock = 1;
...@@ -880,7 +885,7 @@ static void process_init_reply(struct fuse_conn *fc, struct fuse_req *req) ...@@ -880,7 +885,7 @@ static void process_init_reply(struct fuse_conn *fc, struct fuse_req *req)
fc->max_write = max_t(unsigned, 4096, fc->max_write); fc->max_write = max_t(unsigned, 4096, fc->max_write);
fc->conn_init = 1; fc->conn_init = 1;
} }
fc->blocked = 0; fc->initialized = 1;
wake_up_all(&fc->blocked_waitq); wake_up_all(&fc->blocked_waitq);
} }
...@@ -895,7 +900,7 @@ static void fuse_send_init(struct fuse_conn *fc, struct fuse_req *req) ...@@ -895,7 +900,7 @@ static void fuse_send_init(struct fuse_conn *fc, struct fuse_req *req)
FUSE_EXPORT_SUPPORT | FUSE_BIG_WRITES | FUSE_DONT_MASK | FUSE_EXPORT_SUPPORT | FUSE_BIG_WRITES | FUSE_DONT_MASK |
FUSE_SPLICE_WRITE | FUSE_SPLICE_MOVE | FUSE_SPLICE_READ | FUSE_SPLICE_WRITE | FUSE_SPLICE_MOVE | FUSE_SPLICE_READ |
FUSE_FLOCK_LOCKS | FUSE_IOCTL_DIR | FUSE_AUTO_INVAL_DATA | FUSE_FLOCK_LOCKS | FUSE_IOCTL_DIR | FUSE_AUTO_INVAL_DATA |
FUSE_DO_READDIRPLUS | FUSE_READDIRPLUS_AUTO; FUSE_DO_READDIRPLUS | FUSE_READDIRPLUS_AUTO | FUSE_ASYNC_DIO;
req->in.h.opcode = FUSE_INIT; req->in.h.opcode = FUSE_INIT;
req->in.numargs = 1; req->in.numargs = 1;
req->in.args[0].size = sizeof(*arg); req->in.args[0].size = sizeof(*arg);
...@@ -1043,6 +1048,7 @@ static int fuse_fill_super(struct super_block *sb, void *data, int silent) ...@@ -1043,6 +1048,7 @@ static int fuse_fill_super(struct super_block *sb, void *data, int silent)
init_req = fuse_request_alloc(0); init_req = fuse_request_alloc(0);
if (!init_req) if (!init_req)
goto err_put_root; goto err_put_root;
init_req->background = 1;
if (is_bdev) { if (is_bdev) {
fc->destroy_req = fuse_request_alloc(0); fc->destroy_req = fuse_request_alloc(0);
......
...@@ -90,6 +90,9 @@ ...@@ -90,6 +90,9 @@
* 7.21 * 7.21
* - add FUSE_READDIRPLUS * - add FUSE_READDIRPLUS
* - send the requested events in POLL request * - send the requested events in POLL request
*
* 7.22
* - add FUSE_ASYNC_DIO
*/ */
#ifndef _LINUX_FUSE_H #ifndef _LINUX_FUSE_H
...@@ -125,7 +128,7 @@ ...@@ -125,7 +128,7 @@
#define FUSE_KERNEL_VERSION 7 #define FUSE_KERNEL_VERSION 7
/** Minor version number of this interface */ /** Minor version number of this interface */
#define FUSE_KERNEL_MINOR_VERSION 21 #define FUSE_KERNEL_MINOR_VERSION 22
/** The node ID of the root inode */ /** The node ID of the root inode */
#define FUSE_ROOT_ID 1 #define FUSE_ROOT_ID 1
...@@ -215,6 +218,7 @@ struct fuse_file_lock { ...@@ -215,6 +218,7 @@ struct fuse_file_lock {
* FUSE_AUTO_INVAL_DATA: automatically invalidate cached pages * FUSE_AUTO_INVAL_DATA: automatically invalidate cached pages
* FUSE_DO_READDIRPLUS: do READDIRPLUS (READDIR+LOOKUP in one) * FUSE_DO_READDIRPLUS: do READDIRPLUS (READDIR+LOOKUP in one)
* FUSE_READDIRPLUS_AUTO: adaptive readdirplus * FUSE_READDIRPLUS_AUTO: adaptive readdirplus
* FUSE_ASYNC_DIO: asynchronous direct I/O submission
*/ */
#define FUSE_ASYNC_READ (1 << 0) #define FUSE_ASYNC_READ (1 << 0)
#define FUSE_POSIX_LOCKS (1 << 1) #define FUSE_POSIX_LOCKS (1 << 1)
...@@ -231,6 +235,7 @@ struct fuse_file_lock { ...@@ -231,6 +235,7 @@ struct fuse_file_lock {
#define FUSE_AUTO_INVAL_DATA (1 << 12) #define FUSE_AUTO_INVAL_DATA (1 << 12)
#define FUSE_DO_READDIRPLUS (1 << 13) #define FUSE_DO_READDIRPLUS (1 << 13)
#define FUSE_READDIRPLUS_AUTO (1 << 14) #define FUSE_READDIRPLUS_AUTO (1 << 14)
#define FUSE_ASYNC_DIO (1 << 15)
/** /**
* CUSE INIT request/reply flags * CUSE INIT request/reply flags
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment