Commit 9d96acbc authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Add a bvec array to struct xdr_buf for use with iovec_iter()

Add a bvec array to struct xdr_buf, and have the client allocate it
when we need to receive data into pages.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 431f6eb3
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <asm/unaligned.h> #include <asm/unaligned.h>
#include <linux/scatterlist.h> #include <linux/scatterlist.h>
struct bio_vec;
struct rpc_rqst; struct rpc_rqst;
/* /*
...@@ -52,6 +53,7 @@ struct xdr_buf { ...@@ -52,6 +53,7 @@ struct xdr_buf {
struct kvec head[1], /* RPC header + non-page data */ struct kvec head[1], /* RPC header + non-page data */
tail[1]; /* Appended after page data */ tail[1]; /* Appended after page data */
struct bio_vec *bvec;
struct page ** pages; /* Array of pages */ struct page ** pages; /* Array of pages */
unsigned int page_base, /* Start of page data */ unsigned int page_base, /* Start of page data */
page_len, /* Length of page data */ page_len, /* Length of page data */
...@@ -70,6 +72,8 @@ xdr_buf_init(struct xdr_buf *buf, void *start, size_t len) ...@@ -70,6 +72,8 @@ xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
buf->head[0].iov_base = start; buf->head[0].iov_base = start;
buf->head[0].iov_len = len; buf->head[0].iov_len = len;
buf->tail[0].iov_len = 0; buf->tail[0].iov_len = 0;
buf->bvec = NULL;
buf->pages = NULL;
buf->page_len = 0; buf->page_len = 0;
buf->flags = 0; buf->flags = 0;
buf->len = 0; buf->len = 0;
...@@ -116,6 +120,9 @@ __be32 *xdr_decode_netobj(__be32 *p, struct xdr_netobj *); ...@@ -116,6 +120,9 @@ __be32 *xdr_decode_netobj(__be32 *p, struct xdr_netobj *);
void xdr_inline_pages(struct xdr_buf *, unsigned int, void xdr_inline_pages(struct xdr_buf *, unsigned int,
struct page **, unsigned int, unsigned int); struct page **, unsigned int, unsigned int);
void xdr_terminate_string(struct xdr_buf *, const u32); void xdr_terminate_string(struct xdr_buf *, const u32);
size_t xdr_buf_pagecount(struct xdr_buf *buf);
int xdr_alloc_bvec(struct xdr_buf *buf, gfp_t gfp);
void xdr_free_bvec(struct xdr_buf *buf);
static inline __be32 *xdr_encode_array(__be32 *p, const void *s, unsigned int len) static inline __be32 *xdr_encode_array(__be32 *p, const void *s, unsigned int len)
{ {
......
...@@ -141,6 +141,7 @@ struct rpc_xprt_ops { ...@@ -141,6 +141,7 @@ struct rpc_xprt_ops {
void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task); void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
int (*buf_alloc)(struct rpc_task *task); int (*buf_alloc)(struct rpc_task *task);
void (*buf_free)(struct rpc_task *task); void (*buf_free)(struct rpc_task *task);
void (*prepare_request)(struct rpc_rqst *req);
int (*send_request)(struct rpc_rqst *req); int (*send_request)(struct rpc_rqst *req);
void (*set_retrans_timeout)(struct rpc_task *task); void (*set_retrans_timeout)(struct rpc_task *task);
void (*timer)(struct rpc_xprt *xprt, struct rpc_task *task); void (*timer)(struct rpc_xprt *xprt, struct rpc_task *task);
...@@ -343,6 +344,7 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task); ...@@ -343,6 +344,7 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task); void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_free_slot(struct rpc_xprt *xprt, void xprt_free_slot(struct rpc_xprt *xprt,
struct rpc_rqst *req); struct rpc_rqst *req);
void xprt_request_prepare(struct rpc_rqst *req);
bool xprt_prepare_transmit(struct rpc_task *task); bool xprt_prepare_transmit(struct rpc_task *task);
void xprt_request_enqueue_transmit(struct rpc_task *task); void xprt_request_enqueue_transmit(struct rpc_task *task);
void xprt_request_enqueue_receive(struct rpc_task *task); void xprt_request_enqueue_receive(struct rpc_task *task);
......
...@@ -1753,6 +1753,8 @@ rpc_xdr_encode(struct rpc_task *task) ...@@ -1753,6 +1753,8 @@ rpc_xdr_encode(struct rpc_task *task)
task->tk_status = rpcauth_wrap_req(task, encode, req, p, task->tk_status = rpcauth_wrap_req(task, encode, req, p,
task->tk_msg.rpc_argp); task->tk_msg.rpc_argp);
if (task->tk_status == 0)
xprt_request_prepare(req);
} }
/* /*
...@@ -1768,7 +1770,7 @@ call_encode(struct rpc_task *task) ...@@ -1768,7 +1770,7 @@ call_encode(struct rpc_task *task)
/* Did the encode result in an error condition? */ /* Did the encode result in an error condition? */
if (task->tk_status != 0) { if (task->tk_status != 0) {
/* Was the error nonfatal? */ /* Was the error nonfatal? */
if (task->tk_status == -EAGAIN) if (task->tk_status == -EAGAIN || task->tk_status == -ENOMEM)
rpc_delay(task, HZ >> 4); rpc_delay(task, HZ >> 4);
else else
rpc_exit(task, task->tk_status); rpc_exit(task, task->tk_status);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <linux/errno.h> #include <linux/errno.h>
#include <linux/sunrpc/xdr.h> #include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/msg_prot.h> #include <linux/sunrpc/msg_prot.h>
#include <linux/bvec.h>
/* /*
* XDR functions for basic NFS types * XDR functions for basic NFS types
...@@ -128,6 +129,39 @@ xdr_terminate_string(struct xdr_buf *buf, const u32 len) ...@@ -128,6 +129,39 @@ xdr_terminate_string(struct xdr_buf *buf, const u32 len)
} }
EXPORT_SYMBOL_GPL(xdr_terminate_string); EXPORT_SYMBOL_GPL(xdr_terminate_string);
size_t
xdr_buf_pagecount(struct xdr_buf *buf)
{
if (!buf->page_len)
return 0;
return (buf->page_base + buf->page_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
}
int
xdr_alloc_bvec(struct xdr_buf *buf, gfp_t gfp)
{
size_t i, n = xdr_buf_pagecount(buf);
if (n != 0 && buf->bvec == NULL) {
buf->bvec = kmalloc_array(n, sizeof(buf->bvec[0]), gfp);
if (!buf->bvec)
return -ENOMEM;
for (i = 0; i < n; i++) {
buf->bvec[i].bv_page = buf->pages[i];
buf->bvec[i].bv_len = PAGE_SIZE;
buf->bvec[i].bv_offset = 0;
}
}
return 0;
}
void
xdr_free_bvec(struct xdr_buf *buf)
{
kfree(buf->bvec);
buf->bvec = NULL;
}
void void
xdr_inline_pages(struct xdr_buf *xdr, unsigned int offset, xdr_inline_pages(struct xdr_buf *xdr, unsigned int offset,
struct page **pages, unsigned int base, unsigned int len) struct page **pages, unsigned int base, unsigned int len)
......
...@@ -1263,6 +1263,22 @@ xprt_request_dequeue_transmit(struct rpc_task *task) ...@@ -1263,6 +1263,22 @@ xprt_request_dequeue_transmit(struct rpc_task *task)
spin_unlock(&xprt->queue_lock); spin_unlock(&xprt->queue_lock);
} }
/**
* xprt_request_prepare - prepare an encoded request for transport
* @req: pointer to rpc_rqst
*
* Calls into the transport layer to do whatever is needed to prepare
* the request for transmission or receive.
*/
void
xprt_request_prepare(struct rpc_rqst *req)
{
struct rpc_xprt *xprt = req->rq_xprt;
if (xprt->ops->prepare_request)
xprt->ops->prepare_request(req);
}
/** /**
* xprt_request_need_retransmit - Test if a task needs retransmission * xprt_request_need_retransmit - Test if a task needs retransmission
* @task: pointer to rpc_task * @task: pointer to rpc_task
...@@ -1727,6 +1743,7 @@ void xprt_release(struct rpc_task *task) ...@@ -1727,6 +1743,7 @@ void xprt_release(struct rpc_task *task)
if (req->rq_buffer) if (req->rq_buffer)
xprt->ops->buf_free(task); xprt->ops->buf_free(task);
xprt_inject_disconnect(xprt); xprt_inject_disconnect(xprt);
xdr_free_bvec(&req->rq_rcv_buf);
if (req->rq_cred != NULL) if (req->rq_cred != NULL)
put_rpccred(req->rq_cred); put_rpccred(req->rq_cred);
task->tk_rqstp = NULL; task->tk_rqstp = NULL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment