Commit b7561e51 authored by Trond Myklebust's avatar Trond Myklebust

Merge branch 'writeback'

parents 55cfcd12 ce7c252a
...@@ -51,7 +51,7 @@ __be32 nfs4_callback_getattr(void *argp, void *resp, ...@@ -51,7 +51,7 @@ __be32 nfs4_callback_getattr(void *argp, void *resp,
goto out_iput; goto out_iput;
res->size = i_size_read(inode); res->size = i_size_read(inode);
res->change_attr = delegation->change_attr; res->change_attr = delegation->change_attr;
if (nfsi->nrequests != 0) if (nfs_have_writebacks(inode))
res->change_attr++; res->change_attr++;
res->ctime = inode->i_ctime; res->ctime = inode->i_ctime;
res->mtime = inode->i_mtime; res->mtime = inode->i_mtime;
......
...@@ -1089,7 +1089,7 @@ bool nfs4_delegation_flush_on_close(const struct inode *inode) ...@@ -1089,7 +1089,7 @@ bool nfs4_delegation_flush_on_close(const struct inode *inode)
delegation = rcu_dereference(nfsi->delegation); delegation = rcu_dereference(nfsi->delegation);
if (delegation == NULL || !(delegation->type & FMODE_WRITE)) if (delegation == NULL || !(delegation->type & FMODE_WRITE))
goto out; goto out;
if (nfsi->nrequests < delegation->pagemod_limit) if (atomic_long_read(&nfsi->nrequests) < delegation->pagemod_limit)
ret = false; ret = false;
out: out:
rcu_read_unlock(); rcu_read_unlock();
......
...@@ -616,13 +616,13 @@ nfs_direct_write_scan_commit_list(struct inode *inode, ...@@ -616,13 +616,13 @@ nfs_direct_write_scan_commit_list(struct inode *inode,
struct list_head *list, struct list_head *list,
struct nfs_commit_info *cinfo) struct nfs_commit_info *cinfo)
{ {
spin_lock(&cinfo->inode->i_lock); mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
#ifdef CONFIG_NFS_V4_1 #ifdef CONFIG_NFS_V4_1
if (cinfo->ds != NULL && cinfo->ds->nwritten != 0) if (cinfo->ds != NULL && cinfo->ds->nwritten != 0)
NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo); NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo);
#endif #endif
nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0); nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
spin_unlock(&cinfo->inode->i_lock); mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
} }
static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
......
...@@ -1285,7 +1285,6 @@ static bool nfs_file_has_buffered_writers(struct nfs_inode *nfsi) ...@@ -1285,7 +1285,6 @@ static bool nfs_file_has_buffered_writers(struct nfs_inode *nfsi)
static unsigned long nfs_wcc_update_inode(struct inode *inode, struct nfs_fattr *fattr) static unsigned long nfs_wcc_update_inode(struct inode *inode, struct nfs_fattr *fattr)
{ {
struct nfs_inode *nfsi = NFS_I(inode);
unsigned long ret = 0; unsigned long ret = 0;
if ((fattr->valid & NFS_ATTR_FATTR_PRECHANGE) if ((fattr->valid & NFS_ATTR_FATTR_PRECHANGE)
...@@ -1315,7 +1314,7 @@ static unsigned long nfs_wcc_update_inode(struct inode *inode, struct nfs_fattr ...@@ -1315,7 +1314,7 @@ static unsigned long nfs_wcc_update_inode(struct inode *inode, struct nfs_fattr
if ((fattr->valid & NFS_ATTR_FATTR_PRESIZE) if ((fattr->valid & NFS_ATTR_FATTR_PRESIZE)
&& (fattr->valid & NFS_ATTR_FATTR_SIZE) && (fattr->valid & NFS_ATTR_FATTR_SIZE)
&& i_size_read(inode) == nfs_size_to_loff_t(fattr->pre_size) && i_size_read(inode) == nfs_size_to_loff_t(fattr->pre_size)
&& nfsi->nrequests == 0) { && !nfs_have_writebacks(inode)) {
i_size_write(inode, nfs_size_to_loff_t(fattr->size)); i_size_write(inode, nfs_size_to_loff_t(fattr->size));
ret |= NFS_INO_INVALID_ATTR; ret |= NFS_INO_INVALID_ATTR;
} }
...@@ -1823,7 +1822,7 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr) ...@@ -1823,7 +1822,7 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
if (new_isize != cur_isize) { if (new_isize != cur_isize) {
/* Do we perhaps have any outstanding writes, or has /* Do we perhaps have any outstanding writes, or has
* the file grown beyond our last write? */ * the file grown beyond our last write? */
if (nfsi->nrequests == 0 || new_isize > cur_isize) { if (!nfs_have_writebacks(inode) || new_isize > cur_isize) {
i_size_write(inode, new_isize); i_size_write(inode, new_isize);
if (!have_writers) if (!have_writers)
invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA; invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA;
...@@ -2012,10 +2011,11 @@ static void init_once(void *foo) ...@@ -2012,10 +2011,11 @@ static void init_once(void *foo)
INIT_LIST_HEAD(&nfsi->access_cache_entry_lru); INIT_LIST_HEAD(&nfsi->access_cache_entry_lru);
INIT_LIST_HEAD(&nfsi->access_cache_inode_lru); INIT_LIST_HEAD(&nfsi->access_cache_inode_lru);
INIT_LIST_HEAD(&nfsi->commit_info.list); INIT_LIST_HEAD(&nfsi->commit_info.list);
nfsi->nrequests = 0; atomic_long_set(&nfsi->nrequests, 0);
nfsi->commit_info.ncommit = 0; atomic_long_set(&nfsi->commit_info.ncommit, 0);
atomic_set(&nfsi->commit_info.rpcs_out, 0); atomic_set(&nfsi->commit_info.rpcs_out, 0);
init_rwsem(&nfsi->rmdir_sem); init_rwsem(&nfsi->rmdir_sem);
mutex_init(&nfsi->commit_mutex);
nfs4_init_once(nfsi); nfs4_init_once(nfsi);
} }
......
...@@ -134,19 +134,14 @@ EXPORT_SYMBOL_GPL(nfs_async_iocounter_wait); ...@@ -134,19 +134,14 @@ EXPORT_SYMBOL_GPL(nfs_async_iocounter_wait);
/* /*
* nfs_page_group_lock - lock the head of the page group * nfs_page_group_lock - lock the head of the page group
* @req - request in group that is to be locked * @req - request in group that is to be locked
* @nonblock - if true don't block waiting for lock
* *
* this lock must be held if modifying the page group list * this lock must be held when traversing or modifying the page
* group list
* *
* return 0 on success, < 0 on error: -EDELAY if nonblocking or the * return 0 on success, < 0 on error
* result from wait_on_bit_lock
*
* NOTE: calling with nonblock=false should always have set the
* lock bit (see fs/buffer.c and other uses of wait_on_bit_lock
* with TASK_UNINTERRUPTIBLE), so there is no need to check the result.
*/ */
int int
nfs_page_group_lock(struct nfs_page *req, bool nonblock) nfs_page_group_lock(struct nfs_page *req)
{ {
struct nfs_page *head = req->wb_head; struct nfs_page *head = req->wb_head;
...@@ -155,35 +150,10 @@ nfs_page_group_lock(struct nfs_page *req, bool nonblock) ...@@ -155,35 +150,10 @@ nfs_page_group_lock(struct nfs_page *req, bool nonblock)
if (!test_and_set_bit(PG_HEADLOCK, &head->wb_flags)) if (!test_and_set_bit(PG_HEADLOCK, &head->wb_flags))
return 0; return 0;
if (!nonblock) {
set_bit(PG_CONTENDED1, &head->wb_flags);
smp_mb__after_atomic();
return wait_on_bit_lock(&head->wb_flags, PG_HEADLOCK,
TASK_UNINTERRUPTIBLE);
}
return -EAGAIN;
}
/*
* nfs_page_group_lock_wait - wait for the lock to clear, but don't grab it
* @req - a request in the group
*
* This is a blocking call to wait for the group lock to be cleared.
*/
void
nfs_page_group_lock_wait(struct nfs_page *req)
{
struct nfs_page *head = req->wb_head;
WARN_ON_ONCE(head != head->wb_head);
if (!test_bit(PG_HEADLOCK, &head->wb_flags))
return;
set_bit(PG_CONTENDED1, &head->wb_flags); set_bit(PG_CONTENDED1, &head->wb_flags);
smp_mb__after_atomic(); smp_mb__after_atomic();
wait_on_bit(&head->wb_flags, PG_HEADLOCK, return wait_on_bit_lock(&head->wb_flags, PG_HEADLOCK,
TASK_UNINTERRUPTIBLE); TASK_UNINTERRUPTIBLE);
} }
/* /*
...@@ -246,7 +216,7 @@ bool nfs_page_group_sync_on_bit(struct nfs_page *req, unsigned int bit) ...@@ -246,7 +216,7 @@ bool nfs_page_group_sync_on_bit(struct nfs_page *req, unsigned int bit)
{ {
bool ret; bool ret;
nfs_page_group_lock(req, false); nfs_page_group_lock(req);
ret = nfs_page_group_sync_on_bit_locked(req, bit); ret = nfs_page_group_sync_on_bit_locked(req, bit);
nfs_page_group_unlock(req); nfs_page_group_unlock(req);
...@@ -288,9 +258,7 @@ nfs_page_group_init(struct nfs_page *req, struct nfs_page *prev) ...@@ -288,9 +258,7 @@ nfs_page_group_init(struct nfs_page *req, struct nfs_page *prev)
inode = page_file_mapping(req->wb_page)->host; inode = page_file_mapping(req->wb_page)->host;
set_bit(PG_INODE_REF, &req->wb_flags); set_bit(PG_INODE_REF, &req->wb_flags);
kref_get(&req->wb_kref); kref_get(&req->wb_kref);
spin_lock(&inode->i_lock); atomic_long_inc(&NFS_I(inode)->nrequests);
NFS_I(inode)->nrequests++;
spin_unlock(&inode->i_lock);
} }
} }
} }
...@@ -306,14 +274,11 @@ static void ...@@ -306,14 +274,11 @@ static void
nfs_page_group_destroy(struct kref *kref) nfs_page_group_destroy(struct kref *kref)
{ {
struct nfs_page *req = container_of(kref, struct nfs_page, wb_kref); struct nfs_page *req = container_of(kref, struct nfs_page, wb_kref);
struct nfs_page *head = req->wb_head;
struct nfs_page *tmp, *next; struct nfs_page *tmp, *next;
/* subrequests must release the ref on the head request */
if (req->wb_head != req)
nfs_release_request(req->wb_head);
if (!nfs_page_group_sync_on_bit(req, PG_TEARDOWN)) if (!nfs_page_group_sync_on_bit(req, PG_TEARDOWN))
return; goto out;
tmp = req; tmp = req;
do { do {
...@@ -324,6 +289,10 @@ nfs_page_group_destroy(struct kref *kref) ...@@ -324,6 +289,10 @@ nfs_page_group_destroy(struct kref *kref)
nfs_free_request(tmp); nfs_free_request(tmp);
tmp = next; tmp = next;
} while (tmp != req); } while (tmp != req);
out:
/* subrequests must release the ref on the head request */
if (head != req)
nfs_release_request(head);
} }
/** /**
...@@ -465,6 +434,7 @@ void nfs_release_request(struct nfs_page *req) ...@@ -465,6 +434,7 @@ void nfs_release_request(struct nfs_page *req)
{ {
kref_put(&req->wb_kref, nfs_page_group_destroy); kref_put(&req->wb_kref, nfs_page_group_destroy);
} }
EXPORT_SYMBOL_GPL(nfs_release_request);
/** /**
* nfs_wait_on_request - Wait for a request to complete. * nfs_wait_on_request - Wait for a request to complete.
...@@ -483,6 +453,7 @@ nfs_wait_on_request(struct nfs_page *req) ...@@ -483,6 +453,7 @@ nfs_wait_on_request(struct nfs_page *req)
return wait_on_bit_io(&req->wb_flags, PG_BUSY, return wait_on_bit_io(&req->wb_flags, PG_BUSY,
TASK_UNINTERRUPTIBLE); TASK_UNINTERRUPTIBLE);
} }
EXPORT_SYMBOL_GPL(nfs_wait_on_request);
/* /*
* nfs_generic_pg_test - determine if requests can be coalesced * nfs_generic_pg_test - determine if requests can be coalesced
...@@ -1036,7 +1007,7 @@ static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc, ...@@ -1036,7 +1007,7 @@ static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
unsigned int bytes_left = 0; unsigned int bytes_left = 0;
unsigned int offset, pgbase; unsigned int offset, pgbase;
nfs_page_group_lock(req, false); nfs_page_group_lock(req);
subreq = req; subreq = req;
bytes_left = subreq->wb_bytes; bytes_left = subreq->wb_bytes;
...@@ -1058,7 +1029,7 @@ static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc, ...@@ -1058,7 +1029,7 @@ static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
if (mirror->pg_recoalesce) if (mirror->pg_recoalesce)
return 0; return 0;
/* retry add_request for this subreq */ /* retry add_request for this subreq */
nfs_page_group_lock(req, false); nfs_page_group_lock(req);
continue; continue;
} }
...@@ -1155,7 +1126,7 @@ int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc, ...@@ -1155,7 +1126,7 @@ int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
for (midx = 0; midx < desc->pg_mirror_count; midx++) { for (midx = 0; midx < desc->pg_mirror_count; midx++) {
if (midx) { if (midx) {
nfs_page_group_lock(req, false); nfs_page_group_lock(req);
/* find the last request */ /* find the last request */
for (lastreq = req->wb_head; for (lastreq = req->wb_head;
......
...@@ -529,47 +529,6 @@ pnfs_put_lseg(struct pnfs_layout_segment *lseg) ...@@ -529,47 +529,6 @@ pnfs_put_lseg(struct pnfs_layout_segment *lseg)
} }
EXPORT_SYMBOL_GPL(pnfs_put_lseg); EXPORT_SYMBOL_GPL(pnfs_put_lseg);
static void pnfs_free_lseg_async_work(struct work_struct *work)
{
struct pnfs_layout_segment *lseg;
struct pnfs_layout_hdr *lo;
lseg = container_of(work, struct pnfs_layout_segment, pls_work);
lo = lseg->pls_layout;
pnfs_free_lseg(lseg);
pnfs_put_layout_hdr(lo);
}
static void pnfs_free_lseg_async(struct pnfs_layout_segment *lseg)
{
INIT_WORK(&lseg->pls_work, pnfs_free_lseg_async_work);
schedule_work(&lseg->pls_work);
}
void
pnfs_put_lseg_locked(struct pnfs_layout_segment *lseg)
{
if (!lseg)
return;
assert_spin_locked(&lseg->pls_layout->plh_inode->i_lock);
dprintk("%s: lseg %p ref %d valid %d\n", __func__, lseg,
atomic_read(&lseg->pls_refcount),
test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
if (atomic_dec_and_test(&lseg->pls_refcount)) {
struct pnfs_layout_hdr *lo = lseg->pls_layout;
if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags))
return;
pnfs_layout_remove_lseg(lo, lseg);
if (!pnfs_cache_lseg_for_layoutreturn(lo, lseg)) {
pnfs_get_layout_hdr(lo);
pnfs_free_lseg_async(lseg);
}
}
}
/* /*
* is l2 fully contained in l1? * is l2 fully contained in l1?
* start1 end1 * start1 end1
......
...@@ -67,7 +67,6 @@ struct pnfs_layout_segment { ...@@ -67,7 +67,6 @@ struct pnfs_layout_segment {
u32 pls_seq; u32 pls_seq;
unsigned long pls_flags; unsigned long pls_flags;
struct pnfs_layout_hdr *pls_layout; struct pnfs_layout_hdr *pls_layout;
struct work_struct pls_work;
}; };
enum pnfs_try_status { enum pnfs_try_status {
...@@ -230,7 +229,6 @@ extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool sync); ...@@ -230,7 +229,6 @@ extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool sync);
/* pnfs.c */ /* pnfs.c */
void pnfs_get_layout_hdr(struct pnfs_layout_hdr *lo); void pnfs_get_layout_hdr(struct pnfs_layout_hdr *lo);
void pnfs_put_lseg(struct pnfs_layout_segment *lseg); void pnfs_put_lseg(struct pnfs_layout_segment *lseg);
void pnfs_put_lseg_locked(struct pnfs_layout_segment *lseg);
void set_pnfs_layoutdriver(struct nfs_server *, const struct nfs_fh *, struct nfs_fsinfo *); void set_pnfs_layoutdriver(struct nfs_server *, const struct nfs_fh *, struct nfs_fsinfo *);
void unset_pnfs_layoutdriver(struct nfs_server *); void unset_pnfs_layoutdriver(struct nfs_server *);
......
...@@ -83,7 +83,7 @@ pnfs_generic_clear_request_commit(struct nfs_page *req, ...@@ -83,7 +83,7 @@ pnfs_generic_clear_request_commit(struct nfs_page *req,
} }
out: out:
nfs_request_remove_commit_list(req, cinfo); nfs_request_remove_commit_list(req, cinfo);
pnfs_put_lseg_locked(freeme); pnfs_put_lseg(freeme);
} }
EXPORT_SYMBOL_GPL(pnfs_generic_clear_request_commit); EXPORT_SYMBOL_GPL(pnfs_generic_clear_request_commit);
...@@ -91,21 +91,30 @@ static int ...@@ -91,21 +91,30 @@ static int
pnfs_generic_transfer_commit_list(struct list_head *src, struct list_head *dst, pnfs_generic_transfer_commit_list(struct list_head *src, struct list_head *dst,
struct nfs_commit_info *cinfo, int max) struct nfs_commit_info *cinfo, int max)
{ {
struct nfs_page *req, *tmp; struct nfs_page *req;
int ret = 0; int ret = 0;
list_for_each_entry_safe(req, tmp, src, wb_list) { while(!list_empty(src)) {
if (!nfs_lock_request(req)) req = list_first_entry(src, struct nfs_page, wb_list);
continue;
kref_get(&req->wb_kref); kref_get(&req->wb_kref);
if (cond_resched_lock(&cinfo->inode->i_lock)) if (!nfs_lock_request(req)) {
list_safe_reset_next(req, tmp, wb_list); int status;
mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
status = nfs_wait_on_request(req);
nfs_release_request(req);
mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
if (status < 0)
break;
continue;
}
nfs_request_remove_commit_list(req, cinfo); nfs_request_remove_commit_list(req, cinfo);
clear_bit(PG_COMMIT_TO_DS, &req->wb_flags); clear_bit(PG_COMMIT_TO_DS, &req->wb_flags);
nfs_list_add_request(req, dst); nfs_list_add_request(req, dst);
ret++; ret++;
if ((ret == max) && !cinfo->dreq) if ((ret == max) && !cinfo->dreq)
break; break;
cond_resched();
} }
return ret; return ret;
} }
...@@ -119,7 +128,7 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket, ...@@ -119,7 +128,7 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
struct list_head *dst = &bucket->committing; struct list_head *dst = &bucket->committing;
int ret; int ret;
lockdep_assert_held(&cinfo->inode->i_lock); lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
ret = pnfs_generic_transfer_commit_list(src, dst, cinfo, max); ret = pnfs_generic_transfer_commit_list(src, dst, cinfo, max);
if (ret) { if (ret) {
cinfo->ds->nwritten -= ret; cinfo->ds->nwritten -= ret;
...@@ -127,7 +136,7 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket, ...@@ -127,7 +136,7 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
if (bucket->clseg == NULL) if (bucket->clseg == NULL)
bucket->clseg = pnfs_get_lseg(bucket->wlseg); bucket->clseg = pnfs_get_lseg(bucket->wlseg);
if (list_empty(src)) { if (list_empty(src)) {
pnfs_put_lseg_locked(bucket->wlseg); pnfs_put_lseg(bucket->wlseg);
bucket->wlseg = NULL; bucket->wlseg = NULL;
} }
} }
...@@ -142,7 +151,7 @@ int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo, ...@@ -142,7 +151,7 @@ int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo,
{ {
int i, rv = 0, cnt; int i, rv = 0, cnt;
lockdep_assert_held(&cinfo->inode->i_lock); lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
for (i = 0; i < cinfo->ds->nbuckets && max != 0; i++) { for (i = 0; i < cinfo->ds->nbuckets && max != 0; i++) {
cnt = pnfs_generic_scan_ds_commit_list(&cinfo->ds->buckets[i], cnt = pnfs_generic_scan_ds_commit_list(&cinfo->ds->buckets[i],
cinfo, max); cinfo, max);
...@@ -162,7 +171,7 @@ void pnfs_generic_recover_commit_reqs(struct list_head *dst, ...@@ -162,7 +171,7 @@ void pnfs_generic_recover_commit_reqs(struct list_head *dst,
int nwritten; int nwritten;
int i; int i;
lockdep_assert_held(&cinfo->inode->i_lock); lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
restart: restart:
for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) { for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) {
nwritten = pnfs_generic_transfer_commit_list(&b->written, nwritten = pnfs_generic_transfer_commit_list(&b->written,
...@@ -953,12 +962,12 @@ pnfs_layout_mark_request_commit(struct nfs_page *req, ...@@ -953,12 +962,12 @@ pnfs_layout_mark_request_commit(struct nfs_page *req,
struct list_head *list; struct list_head *list;
struct pnfs_commit_bucket *buckets; struct pnfs_commit_bucket *buckets;
spin_lock(&cinfo->inode->i_lock); mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
buckets = cinfo->ds->buckets; buckets = cinfo->ds->buckets;
list = &buckets[ds_commit_idx].written; list = &buckets[ds_commit_idx].written;
if (list_empty(list)) { if (list_empty(list)) {
if (!pnfs_is_valid_lseg(lseg)) { if (!pnfs_is_valid_lseg(lseg)) {
spin_unlock(&cinfo->inode->i_lock); mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
cinfo->completion_ops->resched_write(cinfo, req); cinfo->completion_ops->resched_write(cinfo, req);
return; return;
} }
...@@ -975,7 +984,7 @@ pnfs_layout_mark_request_commit(struct nfs_page *req, ...@@ -975,7 +984,7 @@ pnfs_layout_mark_request_commit(struct nfs_page *req,
cinfo->ds->nwritten++; cinfo->ds->nwritten++;
nfs_request_add_commit_list_locked(req, list, cinfo); nfs_request_add_commit_list_locked(req, list, cinfo);
spin_unlock(&cinfo->inode->i_lock); mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
nfs_mark_page_unstable(req->wb_page, cinfo); nfs_mark_page_unstable(req->wb_page, cinfo);
} }
EXPORT_SYMBOL_GPL(pnfs_layout_mark_request_commit); EXPORT_SYMBOL_GPL(pnfs_layout_mark_request_commit);
......
...@@ -154,6 +154,14 @@ static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error) ...@@ -154,6 +154,14 @@ static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error)
set_bit(NFS_CONTEXT_ERROR_WRITE, &ctx->flags); set_bit(NFS_CONTEXT_ERROR_WRITE, &ctx->flags);
} }
static struct nfs_page *
nfs_page_private_request(struct page *page)
{
if (!PagePrivate(page))
return NULL;
return (struct nfs_page *)page_private(page);
}
/* /*
* nfs_page_find_head_request_locked - find head request associated with @page * nfs_page_find_head_request_locked - find head request associated with @page
* *
...@@ -162,21 +170,41 @@ static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error) ...@@ -162,21 +170,41 @@ static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error)
* returns matching head request with reference held, or NULL if not found. * returns matching head request with reference held, or NULL if not found.
*/ */
static struct nfs_page * static struct nfs_page *
nfs_page_find_head_request_locked(struct nfs_inode *nfsi, struct page *page) nfs_page_find_private_request(struct page *page)
{ {
struct nfs_page *req = NULL; struct address_space *mapping = page_file_mapping(page);
struct nfs_page *req;
if (PagePrivate(page))
req = (struct nfs_page *)page_private(page);
else if (unlikely(PageSwapCache(page)))
req = nfs_page_search_commits_for_head_request_locked(nfsi,
page);
if (!PagePrivate(page))
return NULL;
spin_lock(&mapping->private_lock);
req = nfs_page_private_request(page);
if (req) { if (req) {
WARN_ON_ONCE(req->wb_head != req); WARN_ON_ONCE(req->wb_head != req);
kref_get(&req->wb_kref); kref_get(&req->wb_kref);
} }
spin_unlock(&mapping->private_lock);
return req;
}
static struct nfs_page *
nfs_page_find_swap_request(struct page *page)
{
struct inode *inode = page_file_mapping(page)->host;
struct nfs_inode *nfsi = NFS_I(inode);
struct nfs_page *req = NULL;
if (!PageSwapCache(page))
return NULL;
mutex_lock(&nfsi->commit_mutex);
if (PageSwapCache(page)) {
req = nfs_page_search_commits_for_head_request_locked(nfsi,
page);
if (req) {
WARN_ON_ONCE(req->wb_head != req);
kref_get(&req->wb_kref);
}
}
mutex_unlock(&nfsi->commit_mutex);
return req; return req;
} }
...@@ -187,12 +215,11 @@ nfs_page_find_head_request_locked(struct nfs_inode *nfsi, struct page *page) ...@@ -187,12 +215,11 @@ nfs_page_find_head_request_locked(struct nfs_inode *nfsi, struct page *page)
*/ */
static struct nfs_page *nfs_page_find_head_request(struct page *page) static struct nfs_page *nfs_page_find_head_request(struct page *page)
{ {
struct inode *inode = page_file_mapping(page)->host; struct nfs_page *req;
struct nfs_page *req = NULL;
spin_lock(&inode->i_lock); req = nfs_page_find_private_request(page);
req = nfs_page_find_head_request_locked(NFS_I(inode), page); if (!req)
spin_unlock(&inode->i_lock); req = nfs_page_find_swap_request(page);
return req; return req;
} }
...@@ -241,9 +268,6 @@ nfs_page_group_search_locked(struct nfs_page *head, unsigned int page_offset) ...@@ -241,9 +268,6 @@ nfs_page_group_search_locked(struct nfs_page *head, unsigned int page_offset)
{ {
struct nfs_page *req; struct nfs_page *req;
WARN_ON_ONCE(head != head->wb_head);
WARN_ON_ONCE(!test_bit(PG_HEADLOCK, &head->wb_head->wb_flags));
req = head; req = head;
do { do {
if (page_offset >= req->wb_pgbase && if (page_offset >= req->wb_pgbase &&
...@@ -269,20 +293,17 @@ static bool nfs_page_group_covers_page(struct nfs_page *req) ...@@ -269,20 +293,17 @@ static bool nfs_page_group_covers_page(struct nfs_page *req)
unsigned int pos = 0; unsigned int pos = 0;
unsigned int len = nfs_page_length(req->wb_page); unsigned int len = nfs_page_length(req->wb_page);
nfs_page_group_lock(req, false); nfs_page_group_lock(req);
do { for (;;) {
tmp = nfs_page_group_search_locked(req->wb_head, pos); tmp = nfs_page_group_search_locked(req->wb_head, pos);
if (tmp) { if (!tmp)
/* no way this should happen */ break;
WARN_ON_ONCE(tmp->wb_pgbase != pos); pos = tmp->wb_pgbase + tmp->wb_bytes;
pos += tmp->wb_bytes - (pos - tmp->wb_pgbase); }
}
} while (tmp && pos < len);
nfs_page_group_unlock(req); nfs_page_group_unlock(req);
WARN_ON_ONCE(pos > len); return pos >= len;
return pos == len;
} }
/* We can set the PG_uptodate flag if we see that a write request /* We can set the PG_uptodate flag if we see that a write request
...@@ -333,8 +354,11 @@ static void nfs_end_page_writeback(struct nfs_page *req) ...@@ -333,8 +354,11 @@ static void nfs_end_page_writeback(struct nfs_page *req)
{ {
struct inode *inode = page_file_mapping(req->wb_page)->host; struct inode *inode = page_file_mapping(req->wb_page)->host;
struct nfs_server *nfss = NFS_SERVER(inode); struct nfs_server *nfss = NFS_SERVER(inode);
bool is_done;
if (!nfs_page_group_sync_on_bit(req, PG_WB_END)) is_done = nfs_page_group_sync_on_bit(req, PG_WB_END);
nfs_unlock_request(req);
if (!is_done)
return; return;
end_page_writeback(req->wb_page); end_page_writeback(req->wb_page);
...@@ -342,22 +366,6 @@ static void nfs_end_page_writeback(struct nfs_page *req) ...@@ -342,22 +366,6 @@ static void nfs_end_page_writeback(struct nfs_page *req)
clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC); clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
} }
/* nfs_page_group_clear_bits
* @req - an nfs request
* clears all page group related bits from @req
*/
static void
nfs_page_group_clear_bits(struct nfs_page *req)
{
clear_bit(PG_TEARDOWN, &req->wb_flags);
clear_bit(PG_UNLOCKPAGE, &req->wb_flags);
clear_bit(PG_UPTODATE, &req->wb_flags);
clear_bit(PG_WB_END, &req->wb_flags);
clear_bit(PG_REMOVE, &req->wb_flags);
}
/* /*
* nfs_unroll_locks_and_wait - unlock all newly locked reqs and wait on @req * nfs_unroll_locks_and_wait - unlock all newly locked reqs and wait on @req
* *
...@@ -366,43 +374,24 @@ nfs_page_group_clear_bits(struct nfs_page *req) ...@@ -366,43 +374,24 @@ nfs_page_group_clear_bits(struct nfs_page *req)
* @inode - inode associated with request page group, must be holding inode lock * @inode - inode associated with request page group, must be holding inode lock
* @head - head request of page group, must be holding head lock * @head - head request of page group, must be holding head lock
* @req - request that couldn't lock and needs to wait on the req bit lock * @req - request that couldn't lock and needs to wait on the req bit lock
* @nonblock - if true, don't actually wait
* *
* NOTE: this must be called holding page_group bit lock and inode spin lock * NOTE: this must be called holding page_group bit lock
* and BOTH will be released before returning. * which will be released before returning.
* *
* returns 0 on success, < 0 on error. * returns 0 on success, < 0 on error.
*/ */
static int static void
nfs_unroll_locks_and_wait(struct inode *inode, struct nfs_page *head, nfs_unroll_locks(struct inode *inode, struct nfs_page *head,
struct nfs_page *req, bool nonblock) struct nfs_page *req)
__releases(&inode->i_lock)
{ {
struct nfs_page *tmp; struct nfs_page *tmp;
int ret;
/* relinquish all the locks successfully grabbed this run */ /* relinquish all the locks successfully grabbed this run */
for (tmp = head ; tmp != req; tmp = tmp->wb_this_page) for (tmp = head->wb_this_page ; tmp != req; tmp = tmp->wb_this_page) {
nfs_unlock_request(tmp); if (!kref_read(&tmp->wb_kref))
continue;
WARN_ON_ONCE(test_bit(PG_TEARDOWN, &req->wb_flags)); nfs_unlock_and_release_request(tmp);
}
/* grab a ref on the request that will be waited on */
kref_get(&req->wb_kref);
nfs_page_group_unlock(head);
spin_unlock(&inode->i_lock);
/* release ref from nfs_page_find_head_request_locked */
nfs_release_request(head);
if (!nonblock)
ret = nfs_wait_on_request(req);
else
ret = -EAGAIN;
nfs_release_request(req);
return ret;
} }
/* /*
...@@ -417,7 +406,8 @@ nfs_unroll_locks_and_wait(struct inode *inode, struct nfs_page *head, ...@@ -417,7 +406,8 @@ nfs_unroll_locks_and_wait(struct inode *inode, struct nfs_page *head,
*/ */
static void static void
nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list, nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
struct nfs_page *old_head) struct nfs_page *old_head,
struct inode *inode)
{ {
while (destroy_list) { while (destroy_list) {
struct nfs_page *subreq = destroy_list; struct nfs_page *subreq = destroy_list;
...@@ -428,33 +418,28 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list, ...@@ -428,33 +418,28 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
WARN_ON_ONCE(old_head != subreq->wb_head); WARN_ON_ONCE(old_head != subreq->wb_head);
/* make sure old group is not used */ /* make sure old group is not used */
subreq->wb_head = subreq;
subreq->wb_this_page = subreq; subreq->wb_this_page = subreq;
/* subreq is now totally disconnected from page group or any clear_bit(PG_REMOVE, &subreq->wb_flags);
* write / commit lists. last chance to wake any waiters */
nfs_unlock_request(subreq);
if (!test_bit(PG_TEARDOWN, &subreq->wb_flags)) { /* Note: races with nfs_page_group_destroy() */
/* release ref on old head request */ if (!kref_read(&subreq->wb_kref)) {
nfs_release_request(old_head); /* Check if we raced with nfs_page_group_destroy() */
if (test_and_clear_bit(PG_TEARDOWN, &subreq->wb_flags))
nfs_free_request(subreq);
continue;
}
nfs_page_group_clear_bits(subreq); subreq->wb_head = subreq;
/* release the PG_INODE_REF reference */ if (test_and_clear_bit(PG_INODE_REF, &subreq->wb_flags)) {
if (test_and_clear_bit(PG_INODE_REF, &subreq->wb_flags)) nfs_release_request(subreq);
nfs_release_request(subreq); atomic_long_dec(&NFS_I(inode)->nrequests);
else
WARN_ON_ONCE(1);
} else {
WARN_ON_ONCE(test_bit(PG_CLEAN, &subreq->wb_flags));
/* zombie requests have already released the last
* reference and were waiting on the rest of the
* group to complete. Since it's no longer part of a
* group, simply free the request */
nfs_page_group_clear_bits(subreq);
nfs_free_request(subreq);
} }
/* subreq is now totally disconnected from page group or any
* write / commit lists. last chance to wake any waiters */
nfs_unlock_and_release_request(subreq);
} }
} }
...@@ -464,7 +449,6 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list, ...@@ -464,7 +449,6 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
* operations for this page. * operations for this page.
* *
* @page - the page used to lookup the "page group" of nfs_page structures * @page - the page used to lookup the "page group" of nfs_page structures
* @nonblock - if true, don't block waiting for request locks
* *
* This function joins all sub requests to the head request by first * This function joins all sub requests to the head request by first
* locking all requests in the group, cancelling any pending operations * locking all requests in the group, cancelling any pending operations
...@@ -478,7 +462,7 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list, ...@@ -478,7 +462,7 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
* error was encountered. * error was encountered.
*/ */
static struct nfs_page * static struct nfs_page *
nfs_lock_and_join_requests(struct page *page, bool nonblock) nfs_lock_and_join_requests(struct page *page)
{ {
struct inode *inode = page_file_mapping(page)->host; struct inode *inode = page_file_mapping(page)->host;
struct nfs_page *head, *subreq; struct nfs_page *head, *subreq;
...@@ -487,43 +471,59 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock) ...@@ -487,43 +471,59 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock)
int ret; int ret;
try_again: try_again:
total_bytes = 0;
WARN_ON_ONCE(destroy_list);
spin_lock(&inode->i_lock);
/* /*
* A reference is taken only on the head request which acts as a * A reference is taken only on the head request which acts as a
* reference to the whole page group - the group will not be destroyed * reference to the whole page group - the group will not be destroyed
* until the head reference is released. * until the head reference is released.
*/ */
head = nfs_page_find_head_request_locked(NFS_I(inode), page); head = nfs_page_find_head_request(page);
if (!head)
if (!head) {
spin_unlock(&inode->i_lock);
return NULL; return NULL;
}
/* holding inode lock, so always make a non-blocking call to try the /* lock the page head first in order to avoid an ABBA inefficiency */
* page group lock */ if (!nfs_lock_request(head)) {
ret = nfs_page_group_lock(head, true); ret = nfs_wait_on_request(head);
if (ret < 0) { nfs_release_request(head);
spin_unlock(&inode->i_lock); if (ret < 0)
return ERR_PTR(ret);
goto try_again;
}
if (!nonblock && ret == -EAGAIN) { /* Ensure that nobody removed the request before we locked it */
nfs_page_group_lock_wait(head); if (head != nfs_page_private_request(page) && !PageSwapCache(page)) {
nfs_release_request(head); nfs_unlock_and_release_request(head);
goto try_again; goto try_again;
} }
nfs_release_request(head); ret = nfs_page_group_lock(head);
if (ret < 0) {
nfs_unlock_and_release_request(head);
return ERR_PTR(ret); return ERR_PTR(ret);
} }
/* lock each request in the page group */ /* lock each request in the page group */
subreq = head; total_bytes = head->wb_bytes;
do { for (subreq = head->wb_this_page; subreq != head;
subreq = subreq->wb_this_page) {
if (!kref_get_unless_zero(&subreq->wb_kref))
continue;
while (!nfs_lock_request(subreq)) {
/*
* Unlock page to allow nfs_page_group_sync_on_bit()
* to succeed
*/
nfs_page_group_unlock(head);
ret = nfs_wait_on_request(subreq);
if (!ret)
ret = nfs_page_group_lock(head);
if (ret < 0) {
nfs_unroll_locks(inode, head, subreq);
nfs_release_request(subreq);
nfs_unlock_and_release_request(head);
return ERR_PTR(ret);
}
}
/* /*
* Subrequests are always contiguous, non overlapping * Subrequests are always contiguous, non overlapping
* and in order - but may be repeated (mirrored writes). * and in order - but may be repeated (mirrored writes).
...@@ -534,25 +534,13 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock) ...@@ -534,25 +534,13 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock)
} else if (WARN_ON_ONCE(subreq->wb_offset < head->wb_offset || } else if (WARN_ON_ONCE(subreq->wb_offset < head->wb_offset ||
((subreq->wb_offset + subreq->wb_bytes) > ((subreq->wb_offset + subreq->wb_bytes) >
(head->wb_offset + total_bytes)))) { (head->wb_offset + total_bytes)))) {
nfs_unroll_locks(inode, head, subreq);
nfs_unlock_and_release_request(subreq);
nfs_page_group_unlock(head); nfs_page_group_unlock(head);
spin_unlock(&inode->i_lock); nfs_unlock_and_release_request(head);
return ERR_PTR(-EIO); return ERR_PTR(-EIO);
} }
}
if (!nfs_lock_request(subreq)) {
/* releases page group bit lock and
* inode spin lock and all references */
ret = nfs_unroll_locks_and_wait(inode, head,
subreq, nonblock);
if (ret == 0)
goto try_again;
return ERR_PTR(ret);
}
subreq = subreq->wb_this_page;
} while (subreq != head);
/* Now that all requests are locked, make sure they aren't on any list. /* Now that all requests are locked, make sure they aren't on any list.
* Commit list removal accounting is done after locks are dropped */ * Commit list removal accounting is done after locks are dropped */
...@@ -573,34 +561,30 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock) ...@@ -573,34 +561,30 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock)
head->wb_bytes = total_bytes; head->wb_bytes = total_bytes;
} }
/* /* Postpone destruction of this request */
* prepare head request to be added to new pgio descriptor if (test_and_clear_bit(PG_REMOVE, &head->wb_flags)) {
*/ set_bit(PG_INODE_REF, &head->wb_flags);
nfs_page_group_clear_bits(head);
/*
* some part of the group was still on the inode list - otherwise
* the group wouldn't be involved in async write.
* grab a reference for the head request, iff it needs one.
*/
if (!test_and_set_bit(PG_INODE_REF, &head->wb_flags))
kref_get(&head->wb_kref); kref_get(&head->wb_kref);
atomic_long_inc(&NFS_I(inode)->nrequests);
}
nfs_page_group_unlock(head); nfs_page_group_unlock(head);
/* drop lock to clean uprequests on destroy list */ nfs_destroy_unlinked_subrequests(destroy_list, head, inode);
spin_unlock(&inode->i_lock);
nfs_destroy_unlinked_subrequests(destroy_list, head); /* Did we lose a race with nfs_inode_remove_request()? */
if (!(PagePrivate(page) || PageSwapCache(page))) {
nfs_unlock_and_release_request(head);
return NULL;
}
/* still holds ref on head from nfs_page_find_head_request_locked /* still holds ref on head from nfs_page_find_head_request
* and still has lock on head from lock loop */ * and still has lock on head from lock loop */
return head; return head;
} }
static void nfs_write_error_remove_page(struct nfs_page *req) static void nfs_write_error_remove_page(struct nfs_page *req)
{ {
nfs_unlock_request(req);
nfs_end_page_writeback(req); nfs_end_page_writeback(req);
generic_error_remove_page(page_file_mapping(req->wb_page), generic_error_remove_page(page_file_mapping(req->wb_page),
req->wb_page); req->wb_page);
...@@ -624,12 +608,12 @@ nfs_error_is_fatal_on_server(int err) ...@@ -624,12 +608,12 @@ nfs_error_is_fatal_on_server(int err)
* May return an error if the user signalled nfs_wait_on_request(). * May return an error if the user signalled nfs_wait_on_request().
*/ */
static int nfs_page_async_flush(struct nfs_pageio_descriptor *pgio, static int nfs_page_async_flush(struct nfs_pageio_descriptor *pgio,
struct page *page, bool nonblock) struct page *page)
{ {
struct nfs_page *req; struct nfs_page *req;
int ret = 0; int ret = 0;
req = nfs_lock_and_join_requests(page, nonblock); req = nfs_lock_and_join_requests(page);
if (!req) if (!req)
goto out; goto out;
ret = PTR_ERR(req); ret = PTR_ERR(req);
...@@ -672,7 +656,7 @@ static int nfs_do_writepage(struct page *page, struct writeback_control *wbc, ...@@ -672,7 +656,7 @@ static int nfs_do_writepage(struct page *page, struct writeback_control *wbc,
int ret; int ret;
nfs_pageio_cond_complete(pgio, page_index(page)); nfs_pageio_cond_complete(pgio, page_index(page));
ret = nfs_page_async_flush(pgio, page, wbc->sync_mode == WB_SYNC_NONE); ret = nfs_page_async_flush(pgio, page);
if (ret == -EAGAIN) { if (ret == -EAGAIN) {
redirty_page_for_writepage(wbc, page); redirty_page_for_writepage(wbc, page);
ret = 0; ret = 0;
...@@ -759,6 +743,7 @@ int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc) ...@@ -759,6 +743,7 @@ int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc)
*/ */
static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req) static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
{ {
struct address_space *mapping = page_file_mapping(req->wb_page);
struct nfs_inode *nfsi = NFS_I(inode); struct nfs_inode *nfsi = NFS_I(inode);
WARN_ON_ONCE(req->wb_this_page != req); WARN_ON_ONCE(req->wb_this_page != req);
...@@ -766,27 +751,30 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req) ...@@ -766,27 +751,30 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
/* Lock the request! */ /* Lock the request! */
nfs_lock_request(req); nfs_lock_request(req);
spin_lock(&inode->i_lock);
if (!nfsi->nrequests &&
NFS_PROTO(inode)->have_delegation(inode, FMODE_WRITE))
inode->i_version++;
/* /*
* Swap-space should not get truncated. Hence no need to plug the race * Swap-space should not get truncated. Hence no need to plug the race
* with invalidate/truncate. * with invalidate/truncate.
*/ */
spin_lock(&mapping->private_lock);
if (!nfs_have_writebacks(inode) &&
NFS_PROTO(inode)->have_delegation(inode, FMODE_WRITE)) {
spin_lock(&inode->i_lock);
inode->i_version++;
spin_unlock(&inode->i_lock);
}
if (likely(!PageSwapCache(req->wb_page))) { if (likely(!PageSwapCache(req->wb_page))) {
set_bit(PG_MAPPED, &req->wb_flags); set_bit(PG_MAPPED, &req->wb_flags);
SetPagePrivate(req->wb_page); SetPagePrivate(req->wb_page);
set_page_private(req->wb_page, (unsigned long)req); set_page_private(req->wb_page, (unsigned long)req);
} }
nfsi->nrequests++; spin_unlock(&mapping->private_lock);
atomic_long_inc(&nfsi->nrequests);
/* this a head request for a page group - mark it as having an /* this a head request for a page group - mark it as having an
* extra reference so sub groups can follow suit. * extra reference so sub groups can follow suit.
* This flag also informs pgio layer when to bump nrequests when * This flag also informs pgio layer when to bump nrequests when
* adding subrequests. */ * adding subrequests. */
WARN_ON(test_and_set_bit(PG_INODE_REF, &req->wb_flags)); WARN_ON(test_and_set_bit(PG_INODE_REF, &req->wb_flags));
kref_get(&req->wb_kref); kref_get(&req->wb_kref);
spin_unlock(&inode->i_lock);
} }
/* /*
...@@ -794,25 +782,22 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req) ...@@ -794,25 +782,22 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
*/ */
static void nfs_inode_remove_request(struct nfs_page *req) static void nfs_inode_remove_request(struct nfs_page *req)
{ {
struct inode *inode = d_inode(req->wb_context->dentry); struct address_space *mapping = page_file_mapping(req->wb_page);
struct inode *inode = mapping->host;
struct nfs_inode *nfsi = NFS_I(inode); struct nfs_inode *nfsi = NFS_I(inode);
struct nfs_page *head; struct nfs_page *head;
atomic_long_dec(&nfsi->nrequests);
if (nfs_page_group_sync_on_bit(req, PG_REMOVE)) { if (nfs_page_group_sync_on_bit(req, PG_REMOVE)) {
head = req->wb_head; head = req->wb_head;
spin_lock(&inode->i_lock); spin_lock(&mapping->private_lock);
if (likely(head->wb_page && !PageSwapCache(head->wb_page))) { if (likely(head->wb_page && !PageSwapCache(head->wb_page))) {
set_page_private(head->wb_page, 0); set_page_private(head->wb_page, 0);
ClearPagePrivate(head->wb_page); ClearPagePrivate(head->wb_page);
clear_bit(PG_MAPPED, &head->wb_flags); clear_bit(PG_MAPPED, &head->wb_flags);
} }
nfsi->nrequests--; spin_unlock(&mapping->private_lock);
spin_unlock(&inode->i_lock);
} else {
spin_lock(&inode->i_lock);
nfsi->nrequests--;
spin_unlock(&inode->i_lock);
} }
if (test_and_clear_bit(PG_INODE_REF, &req->wb_flags)) if (test_and_clear_bit(PG_INODE_REF, &req->wb_flags))
...@@ -868,7 +853,8 @@ nfs_page_search_commits_for_head_request_locked(struct nfs_inode *nfsi, ...@@ -868,7 +853,8 @@ nfs_page_search_commits_for_head_request_locked(struct nfs_inode *nfsi,
* number of outstanding requests requiring a commit as well as * number of outstanding requests requiring a commit as well as
* the MM page stats. * the MM page stats.
* *
* The caller must hold cinfo->inode->i_lock, and the nfs_page lock. * The caller must hold NFS_I(cinfo->inode)->commit_mutex, and the
* nfs_page lock.
*/ */
void void
nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst, nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst,
...@@ -876,7 +862,7 @@ nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst, ...@@ -876,7 +862,7 @@ nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst,
{ {
set_bit(PG_CLEAN, &req->wb_flags); set_bit(PG_CLEAN, &req->wb_flags);
nfs_list_add_request(req, dst); nfs_list_add_request(req, dst);
cinfo->mds->ncommit++; atomic_long_inc(&cinfo->mds->ncommit);
} }
EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked); EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked);
...@@ -896,9 +882,9 @@ EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked); ...@@ -896,9 +882,9 @@ EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked);
void void
nfs_request_add_commit_list(struct nfs_page *req, struct nfs_commit_info *cinfo) nfs_request_add_commit_list(struct nfs_page *req, struct nfs_commit_info *cinfo)
{ {
spin_lock(&cinfo->inode->i_lock); mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
nfs_request_add_commit_list_locked(req, &cinfo->mds->list, cinfo); nfs_request_add_commit_list_locked(req, &cinfo->mds->list, cinfo);
spin_unlock(&cinfo->inode->i_lock); mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
if (req->wb_page) if (req->wb_page)
nfs_mark_page_unstable(req->wb_page, cinfo); nfs_mark_page_unstable(req->wb_page, cinfo);
} }
...@@ -922,7 +908,7 @@ nfs_request_remove_commit_list(struct nfs_page *req, ...@@ -922,7 +908,7 @@ nfs_request_remove_commit_list(struct nfs_page *req,
if (!test_and_clear_bit(PG_CLEAN, &(req)->wb_flags)) if (!test_and_clear_bit(PG_CLEAN, &(req)->wb_flags))
return; return;
nfs_list_remove_request(req); nfs_list_remove_request(req);
cinfo->mds->ncommit--; atomic_long_dec(&cinfo->mds->ncommit);
} }
EXPORT_SYMBOL_GPL(nfs_request_remove_commit_list); EXPORT_SYMBOL_GPL(nfs_request_remove_commit_list);
...@@ -967,7 +953,7 @@ nfs_clear_page_commit(struct page *page) ...@@ -967,7 +953,7 @@ nfs_clear_page_commit(struct page *page)
WB_RECLAIMABLE); WB_RECLAIMABLE);
} }
/* Called holding inode (/cinfo) lock */ /* Called holding the request lock on @req */
static void static void
nfs_clear_request_commit(struct nfs_page *req) nfs_clear_request_commit(struct nfs_page *req)
{ {
...@@ -976,9 +962,11 @@ nfs_clear_request_commit(struct nfs_page *req) ...@@ -976,9 +962,11 @@ nfs_clear_request_commit(struct nfs_page *req)
struct nfs_commit_info cinfo; struct nfs_commit_info cinfo;
nfs_init_cinfo_from_inode(&cinfo, inode); nfs_init_cinfo_from_inode(&cinfo, inode);
mutex_lock(&NFS_I(inode)->commit_mutex);
if (!pnfs_clear_request_commit(req, &cinfo)) { if (!pnfs_clear_request_commit(req, &cinfo)) {
nfs_request_remove_commit_list(req, &cinfo); nfs_request_remove_commit_list(req, &cinfo);
} }
mutex_unlock(&NFS_I(inode)->commit_mutex);
nfs_clear_page_commit(req->wb_page); nfs_clear_page_commit(req->wb_page);
} }
} }
...@@ -1023,7 +1011,6 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr) ...@@ -1023,7 +1011,6 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr)
remove_req: remove_req:
nfs_inode_remove_request(req); nfs_inode_remove_request(req);
next: next:
nfs_unlock_request(req);
nfs_end_page_writeback(req); nfs_end_page_writeback(req);
nfs_release_request(req); nfs_release_request(req);
} }
...@@ -1035,28 +1022,36 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr) ...@@ -1035,28 +1022,36 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr)
unsigned long unsigned long
nfs_reqs_to_commit(struct nfs_commit_info *cinfo) nfs_reqs_to_commit(struct nfs_commit_info *cinfo)
{ {
return cinfo->mds->ncommit; return atomic_long_read(&cinfo->mds->ncommit);
} }
/* cinfo->inode->i_lock held by caller */ /* NFS_I(cinfo->inode)->commit_mutex held by caller */
int int
nfs_scan_commit_list(struct list_head *src, struct list_head *dst, nfs_scan_commit_list(struct list_head *src, struct list_head *dst,
struct nfs_commit_info *cinfo, int max) struct nfs_commit_info *cinfo, int max)
{ {
struct nfs_page *req, *tmp; struct nfs_page *req;
int ret = 0; int ret = 0;
list_for_each_entry_safe(req, tmp, src, wb_list) { while(!list_empty(src)) {
if (!nfs_lock_request(req)) req = list_first_entry(src, struct nfs_page, wb_list);
continue;
kref_get(&req->wb_kref); kref_get(&req->wb_kref);
if (cond_resched_lock(&cinfo->inode->i_lock)) if (!nfs_lock_request(req)) {
list_safe_reset_next(req, tmp, wb_list); int status;
mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
status = nfs_wait_on_request(req);
nfs_release_request(req);
mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
if (status < 0)
break;
continue;
}
nfs_request_remove_commit_list(req, cinfo); nfs_request_remove_commit_list(req, cinfo);
nfs_list_add_request(req, dst); nfs_list_add_request(req, dst);
ret++; ret++;
if ((ret == max) && !cinfo->dreq) if ((ret == max) && !cinfo->dreq)
break; break;
cond_resched();
} }
return ret; return ret;
} }
...@@ -1076,15 +1071,17 @@ nfs_scan_commit(struct inode *inode, struct list_head *dst, ...@@ -1076,15 +1071,17 @@ nfs_scan_commit(struct inode *inode, struct list_head *dst,
{ {
int ret = 0; int ret = 0;
spin_lock(&cinfo->inode->i_lock); if (!atomic_long_read(&cinfo->mds->ncommit))
if (cinfo->mds->ncommit > 0) { return 0;
mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
if (atomic_long_read(&cinfo->mds->ncommit) > 0) {
const int max = INT_MAX; const int max = INT_MAX;
ret = nfs_scan_commit_list(&cinfo->mds->list, dst, ret = nfs_scan_commit_list(&cinfo->mds->list, dst,
cinfo, max); cinfo, max);
ret += pnfs_scan_commit_lists(inode, cinfo, max - ret); ret += pnfs_scan_commit_lists(inode, cinfo, max - ret);
} }
spin_unlock(&cinfo->inode->i_lock); mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
return ret; return ret;
} }
...@@ -1105,43 +1102,21 @@ static struct nfs_page *nfs_try_to_update_request(struct inode *inode, ...@@ -1105,43 +1102,21 @@ static struct nfs_page *nfs_try_to_update_request(struct inode *inode,
unsigned int end; unsigned int end;
int error; int error;
if (!PagePrivate(page))
return NULL;
end = offset + bytes; end = offset + bytes;
spin_lock(&inode->i_lock);
for (;;) { req = nfs_lock_and_join_requests(page);
req = nfs_page_find_head_request_locked(NFS_I(inode), page); if (IS_ERR_OR_NULL(req))
if (req == NULL) return req;
goto out_unlock;
/* should be handled by nfs_flush_incompatible */
WARN_ON_ONCE(req->wb_head != req);
WARN_ON_ONCE(req->wb_this_page != req);
rqend = req->wb_offset + req->wb_bytes; rqend = req->wb_offset + req->wb_bytes;
/* /*
* Tell the caller to flush out the request if * Tell the caller to flush out the request if
* the offsets are non-contiguous. * the offsets are non-contiguous.
* Note: nfs_flush_incompatible() will already * Note: nfs_flush_incompatible() will already
* have flushed out requests having wrong owners. * have flushed out requests having wrong owners.
*/ */
if (offset > rqend if (offset > rqend || end < req->wb_offset)
|| end < req->wb_offset) goto out_flushme;
goto out_flushme;
if (nfs_lock_request(req))
break;
/* The request is locked, so wait and then retry */
spin_unlock(&inode->i_lock);
error = nfs_wait_on_request(req);
nfs_release_request(req);
if (error != 0)
goto out_err;
spin_lock(&inode->i_lock);
}
/* Okay, the request matches. Update the region */ /* Okay, the request matches. Update the region */
if (offset < req->wb_offset) { if (offset < req->wb_offset) {
...@@ -1152,17 +1127,17 @@ static struct nfs_page *nfs_try_to_update_request(struct inode *inode, ...@@ -1152,17 +1127,17 @@ static struct nfs_page *nfs_try_to_update_request(struct inode *inode,
req->wb_bytes = end - req->wb_offset; req->wb_bytes = end - req->wb_offset;
else else
req->wb_bytes = rqend - req->wb_offset; req->wb_bytes = rqend - req->wb_offset;
out_unlock:
if (req)
nfs_clear_request_commit(req);
spin_unlock(&inode->i_lock);
return req; return req;
out_flushme: out_flushme:
spin_unlock(&inode->i_lock); /*
nfs_release_request(req); * Note: we mark the request dirty here because
* nfs_lock_and_join_requests() cannot preserve
* commit flags, so we have to replay the write.
*/
nfs_mark_request_dirty(req);
nfs_unlock_and_release_request(req);
error = nfs_wb_page(inode, page); error = nfs_wb_page(inode, page);
out_err: return (error < 0) ? ERR_PTR(error) : NULL;
return ERR_PTR(error);
} }
/* /*
...@@ -1227,8 +1202,6 @@ int nfs_flush_incompatible(struct file *file, struct page *page) ...@@ -1227,8 +1202,6 @@ int nfs_flush_incompatible(struct file *file, struct page *page)
l_ctx = req->wb_lock_context; l_ctx = req->wb_lock_context;
do_flush = req->wb_page != page || do_flush = req->wb_page != page ||
!nfs_match_open_context(req->wb_context, ctx); !nfs_match_open_context(req->wb_context, ctx);
/* for now, flush if more than 1 request in page_group */
do_flush |= req->wb_this_page != req;
if (l_ctx && flctx && if (l_ctx && flctx &&
!(list_empty_careful(&flctx->flc_posix) && !(list_empty_careful(&flctx->flc_posix) &&
list_empty_careful(&flctx->flc_flock))) { list_empty_careful(&flctx->flc_flock))) {
...@@ -1412,7 +1385,6 @@ static void nfs_redirty_request(struct nfs_page *req) ...@@ -1412,7 +1385,6 @@ static void nfs_redirty_request(struct nfs_page *req)
{ {
nfs_mark_request_dirty(req); nfs_mark_request_dirty(req);
set_bit(NFS_CONTEXT_RESEND_WRITES, &req->wb_context->flags); set_bit(NFS_CONTEXT_RESEND_WRITES, &req->wb_context->flags);
nfs_unlock_request(req);
nfs_end_page_writeback(req); nfs_end_page_writeback(req);
nfs_release_request(req); nfs_release_request(req);
} }
...@@ -1934,7 +1906,7 @@ int nfs_write_inode(struct inode *inode, struct writeback_control *wbc) ...@@ -1934,7 +1906,7 @@ int nfs_write_inode(struct inode *inode, struct writeback_control *wbc)
int ret = 0; int ret = 0;
/* no commits means nothing needs to be done */ /* no commits means nothing needs to be done */
if (!nfsi->commit_info.ncommit) if (!atomic_long_read(&nfsi->commit_info.ncommit))
return ret; return ret;
if (wbc->sync_mode == WB_SYNC_NONE) { if (wbc->sync_mode == WB_SYNC_NONE) {
...@@ -2015,7 +1987,7 @@ int nfs_wb_page_cancel(struct inode *inode, struct page *page) ...@@ -2015,7 +1987,7 @@ int nfs_wb_page_cancel(struct inode *inode, struct page *page)
/* blocking call to cancel all requests and join to a single (head) /* blocking call to cancel all requests and join to a single (head)
* request */ * request */
req = nfs_lock_and_join_requests(page, false); req = nfs_lock_and_join_requests(page);
if (IS_ERR(req)) { if (IS_ERR(req)) {
ret = PTR_ERR(req); ret = PTR_ERR(req);
......
...@@ -154,7 +154,7 @@ struct nfs_inode { ...@@ -154,7 +154,7 @@ struct nfs_inode {
*/ */
__be32 cookieverf[2]; __be32 cookieverf[2];
unsigned long nrequests; atomic_long_t nrequests;
struct nfs_mds_commit_info commit_info; struct nfs_mds_commit_info commit_info;
/* Open contexts for shared mmap writes */ /* Open contexts for shared mmap writes */
...@@ -163,6 +163,7 @@ struct nfs_inode { ...@@ -163,6 +163,7 @@ struct nfs_inode {
/* Readers: in-flight sillydelete RPC calls */ /* Readers: in-flight sillydelete RPC calls */
/* Writers: rmdir */ /* Writers: rmdir */
struct rw_semaphore rmdir_sem; struct rw_semaphore rmdir_sem;
struct mutex commit_mutex;
#if IS_ENABLED(CONFIG_NFS_V4) #if IS_ENABLED(CONFIG_NFS_V4)
struct nfs4_cached_acl *nfs4_acl; struct nfs4_cached_acl *nfs4_acl;
...@@ -510,7 +511,7 @@ extern void nfs_commit_free(struct nfs_commit_data *data); ...@@ -510,7 +511,7 @@ extern void nfs_commit_free(struct nfs_commit_data *data);
static inline int static inline int
nfs_have_writebacks(struct inode *inode) nfs_have_writebacks(struct inode *inode)
{ {
return NFS_I(inode)->nrequests != 0; return atomic_long_read(&NFS_I(inode)->nrequests) != 0;
} }
/* /*
......
...@@ -139,8 +139,7 @@ extern size_t nfs_generic_pg_test(struct nfs_pageio_descriptor *desc, ...@@ -139,8 +139,7 @@ extern size_t nfs_generic_pg_test(struct nfs_pageio_descriptor *desc,
extern int nfs_wait_on_request(struct nfs_page *); extern int nfs_wait_on_request(struct nfs_page *);
extern void nfs_unlock_request(struct nfs_page *req); extern void nfs_unlock_request(struct nfs_page *req);
extern void nfs_unlock_and_release_request(struct nfs_page *); extern void nfs_unlock_and_release_request(struct nfs_page *);
extern int nfs_page_group_lock(struct nfs_page *, bool); extern int nfs_page_group_lock(struct nfs_page *);
extern void nfs_page_group_lock_wait(struct nfs_page *);
extern void nfs_page_group_unlock(struct nfs_page *); extern void nfs_page_group_unlock(struct nfs_page *);
extern bool nfs_page_group_sync_on_bit(struct nfs_page *, unsigned int); extern bool nfs_page_group_sync_on_bit(struct nfs_page *, unsigned int);
extern bool nfs_async_iocounter_wait(struct rpc_task *, struct nfs_lock_context *); extern bool nfs_async_iocounter_wait(struct rpc_task *, struct nfs_lock_context *);
......
...@@ -1476,7 +1476,7 @@ struct nfs_pgio_header { ...@@ -1476,7 +1476,7 @@ struct nfs_pgio_header {
struct nfs_mds_commit_info { struct nfs_mds_commit_info {
atomic_t rpcs_out; atomic_t rpcs_out;
unsigned long ncommit; atomic_long_t ncommit;
struct list_head list; struct list_head list;
}; };
......
...@@ -139,6 +139,8 @@ struct rpc_task_setup { ...@@ -139,6 +139,8 @@ struct rpc_task_setup {
#define RPC_TASK_RUNNING 0 #define RPC_TASK_RUNNING 0
#define RPC_TASK_QUEUED 1 #define RPC_TASK_QUEUED 1
#define RPC_TASK_ACTIVE 2 #define RPC_TASK_ACTIVE 2
#define RPC_TASK_MSG_RECV 3
#define RPC_TASK_MSG_RECV_WAIT 4
#define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate) #define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
#define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate) #define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
......
...@@ -232,6 +232,7 @@ struct rpc_xprt { ...@@ -232,6 +232,7 @@ struct rpc_xprt {
*/ */
spinlock_t transport_lock; /* lock transport info */ spinlock_t transport_lock; /* lock transport info */
spinlock_t reserve_lock; /* lock slot table */ spinlock_t reserve_lock; /* lock slot table */
spinlock_t recv_lock; /* lock receive list */
u32 xid; /* Next XID value to use */ u32 xid; /* Next XID value to use */
struct rpc_task * snd_task; /* Task blocked in send */ struct rpc_task * snd_task; /* Task blocked in send */
struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */ struct svc_xprt *bc_xprt; /* NFSv4.1 backchannel */
...@@ -372,6 +373,8 @@ void xprt_write_space(struct rpc_xprt *xprt); ...@@ -372,6 +373,8 @@ void xprt_write_space(struct rpc_xprt *xprt);
void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result); void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result);
struct rpc_rqst * xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid); struct rpc_rqst * xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid);
void xprt_complete_rqst(struct rpc_task *task, int copied); void xprt_complete_rqst(struct rpc_task *task, int copied);
void xprt_pin_rqst(struct rpc_rqst *req);
void xprt_unpin_rqst(struct rpc_rqst *req);
void xprt_release_rqst_cong(struct rpc_task *task); void xprt_release_rqst_cong(struct rpc_task *task);
void xprt_disconnect_done(struct rpc_xprt *xprt); void xprt_disconnect_done(struct rpc_xprt *xprt);
void xprt_force_disconnect(struct rpc_xprt *xprt); void xprt_force_disconnect(struct rpc_xprt *xprt);
......
...@@ -171,10 +171,10 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) ...@@ -171,10 +171,10 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
/* /*
* Add the temporary list to the backchannel preallocation list * Add the temporary list to the backchannel preallocation list
*/ */
spin_lock_bh(&xprt->bc_pa_lock); spin_lock(&xprt->bc_pa_lock);
list_splice(&tmp_list, &xprt->bc_pa_list); list_splice(&tmp_list, &xprt->bc_pa_list);
xprt_inc_alloc_count(xprt, min_reqs); xprt_inc_alloc_count(xprt, min_reqs);
spin_unlock_bh(&xprt->bc_pa_lock); spin_unlock(&xprt->bc_pa_lock);
dprintk("RPC: setup backchannel transport done\n"); dprintk("RPC: setup backchannel transport done\n");
return 0; return 0;
......
...@@ -1001,7 +1001,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) ...@@ -1001,7 +1001,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
if (!bc_xprt) if (!bc_xprt)
return -EAGAIN; return -EAGAIN;
spin_lock_bh(&bc_xprt->transport_lock); spin_lock(&bc_xprt->recv_lock);
req = xprt_lookup_rqst(bc_xprt, xid); req = xprt_lookup_rqst(bc_xprt, xid);
if (!req) if (!req)
goto unlock_notfound; goto unlock_notfound;
...@@ -1019,7 +1019,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) ...@@ -1019,7 +1019,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
memcpy(dst->iov_base, src->iov_base, src->iov_len); memcpy(dst->iov_base, src->iov_base, src->iov_len);
xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len); xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len);
rqstp->rq_arg.len = 0; rqstp->rq_arg.len = 0;
spin_unlock_bh(&bc_xprt->transport_lock); spin_unlock(&bc_xprt->recv_lock);
return 0; return 0;
unlock_notfound: unlock_notfound:
printk(KERN_NOTICE printk(KERN_NOTICE
...@@ -1028,7 +1028,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) ...@@ -1028,7 +1028,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
__func__, ntohl(calldir), __func__, ntohl(calldir),
bc_xprt, ntohl(xid)); bc_xprt, ntohl(xid));
unlock_eagain: unlock_eagain:
spin_unlock_bh(&bc_xprt->transport_lock); spin_unlock(&bc_xprt->recv_lock);
return -EAGAIN; return -EAGAIN;
} }
......
...@@ -844,6 +844,48 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) ...@@ -844,6 +844,48 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
} }
EXPORT_SYMBOL_GPL(xprt_lookup_rqst); EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
/**
* xprt_pin_rqst - Pin a request on the transport receive list
* @req: Request to pin
*
* Caller must ensure this is atomic with the call to xprt_lookup_rqst()
* so should be holding the xprt transport lock.
*/
void xprt_pin_rqst(struct rpc_rqst *req)
{
set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
}
/**
* xprt_unpin_rqst - Unpin a request on the transport receive list
* @req: Request to pin
*
* Caller should be holding the xprt transport lock.
*/
void xprt_unpin_rqst(struct rpc_rqst *req)
{
struct rpc_task *task = req->rq_task;
clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
}
static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
__must_hold(&req->rq_xprt->recv_lock)
{
struct rpc_task *task = req->rq_task;
if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
spin_unlock(&req->rq_xprt->recv_lock);
set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
TASK_UNINTERRUPTIBLE);
clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
spin_lock(&req->rq_xprt->recv_lock);
}
}
static void xprt_update_rtt(struct rpc_task *task) static void xprt_update_rtt(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
...@@ -966,13 +1008,13 @@ void xprt_transmit(struct rpc_task *task) ...@@ -966,13 +1008,13 @@ void xprt_transmit(struct rpc_task *task)
/* /*
* Add to the list only if we're expecting a reply * Add to the list only if we're expecting a reply
*/ */
spin_lock_bh(&xprt->transport_lock);
/* Update the softirq receive buffer */ /* Update the softirq receive buffer */
memcpy(&req->rq_private_buf, &req->rq_rcv_buf, memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
sizeof(req->rq_private_buf)); sizeof(req->rq_private_buf));
/* Add request to the receive list */ /* Add request to the receive list */
spin_lock(&xprt->recv_lock);
list_add_tail(&req->rq_list, &xprt->recv); list_add_tail(&req->rq_list, &xprt->recv);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->recv_lock);
xprt_reset_majortimeo(req); xprt_reset_majortimeo(req);
/* Turn off autodisconnect */ /* Turn off autodisconnect */
del_singleshot_timer_sync(&xprt->timer); del_singleshot_timer_sync(&xprt->timer);
...@@ -1287,12 +1329,16 @@ void xprt_release(struct rpc_task *task) ...@@ -1287,12 +1329,16 @@ void xprt_release(struct rpc_task *task)
task->tk_ops->rpc_count_stats(task, task->tk_calldata); task->tk_ops->rpc_count_stats(task, task->tk_calldata);
else if (task->tk_client) else if (task->tk_client)
rpc_count_iostats(task, task->tk_client->cl_metrics); rpc_count_iostats(task, task->tk_client->cl_metrics);
spin_lock(&xprt->recv_lock);
if (!list_empty(&req->rq_list)) {
list_del(&req->rq_list);
xprt_wait_on_pinned_rqst(req);
}
spin_unlock(&xprt->recv_lock);
spin_lock_bh(&xprt->transport_lock); spin_lock_bh(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task); xprt->ops->release_xprt(xprt, task);
if (xprt->ops->release_request) if (xprt->ops->release_request)
xprt->ops->release_request(task); xprt->ops->release_request(task);
if (!list_empty(&req->rq_list))
list_del(&req->rq_list);
xprt->last_used = jiffies; xprt->last_used = jiffies;
xprt_schedule_autodisconnect(xprt); xprt_schedule_autodisconnect(xprt);
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
...@@ -1318,6 +1364,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) ...@@ -1318,6 +1364,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
spin_lock_init(&xprt->transport_lock); spin_lock_init(&xprt->transport_lock);
spin_lock_init(&xprt->reserve_lock); spin_lock_init(&xprt->reserve_lock);
spin_lock_init(&xprt->recv_lock);
INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->free);
INIT_LIST_HEAD(&xprt->recv); INIT_LIST_HEAD(&xprt->recv);
......
...@@ -1051,7 +1051,7 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1051,7 +1051,7 @@ rpcrdma_reply_handler(struct work_struct *work)
* RPC completion while holding the transport lock to ensure * RPC completion while holding the transport lock to ensure
* the rep, rqst, and rq_task pointers remain stable. * the rep, rqst, and rq_task pointers remain stable.
*/ */
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->recv_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
if (!rqst) if (!rqst)
goto out_norqst; goto out_norqst;
...@@ -1136,7 +1136,7 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1136,7 +1136,7 @@ rpcrdma_reply_handler(struct work_struct *work)
xprt_release_rqst_cong(rqst->rq_task); xprt_release_rqst_cong(rqst->rq_task);
xprt_complete_rqst(rqst->rq_task, status); xprt_complete_rqst(rqst->rq_task, status);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->recv_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status); __func__, xprt, rqst, status);
return; return;
...@@ -1187,12 +1187,12 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1187,12 +1187,12 @@ rpcrdma_reply_handler(struct work_struct *work)
r_xprt->rx_stats.bad_reply_count++; r_xprt->rx_stats.bad_reply_count++;
goto out; goto out;
/* The req was still available, but by the time the transport_lock /* The req was still available, but by the time the recv_lock
* was acquired, the rqst and task had been released. Thus the RPC * was acquired, the rqst and task had been released. Thus the RPC
* has already been terminated. * has already been terminated.
*/ */
out_norqst: out_norqst:
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->recv_lock);
rpcrdma_buffer_put(req); rpcrdma_buffer_put(req);
dprintk("RPC: %s: race, no rqst left for req %p\n", dprintk("RPC: %s: race, no rqst left for req %p\n",
__func__, req); __func__, req);
......
...@@ -52,7 +52,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, ...@@ -52,7 +52,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
if (src->iov_len < 24) if (src->iov_len < 24)
goto out_shortreply; goto out_shortreply;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->recv_lock);
req = xprt_lookup_rqst(xprt, xid); req = xprt_lookup_rqst(xprt, xid);
if (!req) if (!req)
goto out_notfound; goto out_notfound;
...@@ -69,17 +69,20 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, ...@@ -69,17 +69,20 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
else if (credits > r_xprt->rx_buf.rb_bc_max_requests) else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
credits = r_xprt->rx_buf.rb_bc_max_requests; credits = r_xprt->rx_buf.rb_bc_max_requests;
spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd; cwnd = xprt->cwnd;
xprt->cwnd = credits << RPC_CWNDSHIFT; xprt->cwnd = credits << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd) if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(req->rq_task); xprt_release_rqst_cong(req->rq_task);
spin_unlock_bh(&xprt->transport_lock);
ret = 0; ret = 0;
xprt_complete_rqst(req->rq_task, rcvbuf->len); xprt_complete_rqst(req->rq_task, rcvbuf->len);
rcvbuf->len = 0; rcvbuf->len = 0;
out_unlock: out_unlock:
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->recv_lock);
out: out:
return ret; return ret;
......
...@@ -969,10 +969,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt, ...@@ -969,10 +969,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
return; return;
/* Look up and lock the request corresponding to the given XID */ /* Look up and lock the request corresponding to the given XID */
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->recv_lock);
rovr = xprt_lookup_rqst(xprt, *xp); rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr) if (!rovr)
goto out_unlock; goto out_unlock;
xprt_pin_rqst(rovr);
spin_unlock(&xprt->recv_lock);
task = rovr->rq_task; task = rovr->rq_task;
copied = rovr->rq_private_buf.buflen; copied = rovr->rq_private_buf.buflen;
...@@ -981,13 +983,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt, ...@@ -981,13 +983,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
dprintk("RPC: sk_buff copy failed\n"); dprintk("RPC: sk_buff copy failed\n");
goto out_unlock; spin_lock(&xprt->recv_lock);
goto out_unpin;
} }
spin_lock(&xprt->recv_lock);
xprt_complete_rqst(task, copied); xprt_complete_rqst(task, copied);
out_unpin:
xprt_unpin_rqst(rovr);
out_unlock: out_unlock:
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->recv_lock);
} }
static void xs_local_data_receive(struct sock_xprt *transport) static void xs_local_data_receive(struct sock_xprt *transport)
...@@ -1050,10 +1055,12 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, ...@@ -1050,10 +1055,12 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
return; return;
/* Look up and lock the request corresponding to the given XID */ /* Look up and lock the request corresponding to the given XID */
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->recv_lock);
rovr = xprt_lookup_rqst(xprt, *xp); rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr) if (!rovr)
goto out_unlock; goto out_unlock;
xprt_pin_rqst(rovr);
spin_unlock(&xprt->recv_lock);
task = rovr->rq_task; task = rovr->rq_task;
if ((copied = rovr->rq_private_buf.buflen) > repsize) if ((copied = rovr->rq_private_buf.buflen) > repsize)
...@@ -1062,16 +1069,21 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, ...@@ -1062,16 +1069,21 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
/* Suck it into the iovec, verify checksum if not done by hw. */ /* Suck it into the iovec, verify checksum if not done by hw. */
if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
__UDPX_INC_STATS(sk, UDP_MIB_INERRORS); __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
goto out_unlock; spin_lock(&xprt->recv_lock);
goto out_unpin;
} }
__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
spin_lock_bh(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, copied); xprt_adjust_cwnd(xprt, task, copied);
spin_unlock_bh(&xprt->transport_lock);
spin_lock(&xprt->recv_lock);
xprt_complete_rqst(task, copied); xprt_complete_rqst(task, copied);
out_unpin:
xprt_unpin_rqst(rovr);
out_unlock: out_unlock:
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->recv_lock);
} }
static void xs_udp_data_receive(struct sock_xprt *transport) static void xs_udp_data_receive(struct sock_xprt *transport)
...@@ -1277,25 +1289,12 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt, ...@@ -1277,25 +1289,12 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
} }
len = desc->count; len = desc->count;
if (len > transport->tcp_reclen - transport->tcp_offset) { if (len > transport->tcp_reclen - transport->tcp_offset)
struct xdr_skb_reader my_desc; desc->count = transport->tcp_reclen - transport->tcp_offset;
r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
len = transport->tcp_reclen - transport->tcp_offset;
memcpy(&my_desc, desc, sizeof(my_desc));
my_desc.count = len;
r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
&my_desc, xdr_skb_read_bits);
desc->count -= r;
desc->offset += r;
} else
r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
desc, xdr_skb_read_bits); desc, xdr_skb_read_bits);
if (r > 0) { if (desc->count) {
transport->tcp_copied += r;
transport->tcp_offset += r;
}
if (r != len) {
/* Error when copying to the receive buffer, /* Error when copying to the receive buffer,
* usually because we weren't able to allocate * usually because we weren't able to allocate
* additional buffer pages. All we can do now * additional buffer pages. All we can do now
...@@ -1315,6 +1314,10 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt, ...@@ -1315,6 +1314,10 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
return; return;
} }
transport->tcp_copied += r;
transport->tcp_offset += r;
desc->count = len - r;
dprintk("RPC: XID %08x read %zd bytes\n", dprintk("RPC: XID %08x read %zd bytes\n",
ntohl(transport->tcp_xid), r); ntohl(transport->tcp_xid), r);
dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, " dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
...@@ -1343,21 +1346,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, ...@@ -1343,21 +1346,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
/* Find and lock the request corresponding to this xid */ /* Find and lock the request corresponding to this xid */
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->recv_lock);
req = xprt_lookup_rqst(xprt, transport->tcp_xid); req = xprt_lookup_rqst(xprt, transport->tcp_xid);
if (!req) { if (!req) {
dprintk("RPC: XID %08x request not found!\n", dprintk("RPC: XID %08x request not found!\n",
ntohl(transport->tcp_xid)); ntohl(transport->tcp_xid));
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->recv_lock);
return -1; return -1;
} }
xprt_pin_rqst(req);
spin_unlock(&xprt->recv_lock);
xs_tcp_read_common(xprt, desc, req); xs_tcp_read_common(xprt, desc, req);
spin_lock(&xprt->recv_lock);
if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
xprt_complete_rqst(req->rq_task, transport->tcp_copied); xprt_complete_rqst(req->rq_task, transport->tcp_copied);
xprt_unpin_rqst(req);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->recv_lock);
return 0; return 0;
} }
...@@ -1376,11 +1382,9 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, ...@@ -1376,11 +1382,9 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
container_of(xprt, struct sock_xprt, xprt); container_of(xprt, struct sock_xprt, xprt);
struct rpc_rqst *req; struct rpc_rqst *req;
/* Look up and lock the request corresponding to the given XID */ /* Look up the request corresponding to the given XID */
spin_lock_bh(&xprt->transport_lock);
req = xprt_lookup_bc_request(xprt, transport->tcp_xid); req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
if (req == NULL) { if (req == NULL) {
spin_unlock_bh(&xprt->transport_lock);
printk(KERN_WARNING "Callback slot table overflowed\n"); printk(KERN_WARNING "Callback slot table overflowed\n");
xprt_force_disconnect(xprt); xprt_force_disconnect(xprt);
return -1; return -1;
...@@ -1391,7 +1395,6 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, ...@@ -1391,7 +1395,6 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
xprt_complete_bc_request(req, transport->tcp_copied); xprt_complete_bc_request(req, transport->tcp_copied);
spin_unlock_bh(&xprt->transport_lock);
return 0; return 0;
} }
...@@ -1516,6 +1519,7 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) ...@@ -1516,6 +1519,7 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
.arg.data = xprt, .arg.data = xprt,
}; };
unsigned long total = 0; unsigned long total = 0;
int loop;
int read = 0; int read = 0;
mutex_lock(&transport->recv_mutex); mutex_lock(&transport->recv_mutex);
...@@ -1524,20 +1528,20 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) ...@@ -1524,20 +1528,20 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
goto out; goto out;
/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
for (;;) { for (loop = 0; loop < 64; loop++) {
lock_sock(sk); lock_sock(sk);
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
if (read <= 0) { if (read <= 0) {
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
release_sock(sk); release_sock(sk);
if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break;
break;
} else {
release_sock(sk);
total += read;
} }
release_sock(sk);
total += read;
rd_desc.count = 65536; rd_desc.count = 65536;
} }
if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
queue_work(xprtiod_workqueue, &transport->recv_worker);
out: out:
mutex_unlock(&transport->recv_mutex); mutex_unlock(&transport->recv_mutex);
trace_xs_tcp_data_ready(xprt, read, total); trace_xs_tcp_data_ready(xprt, read, total);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment