Commit 5fe6eaa1 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

SUNRPC: Generalize the RPC buffer allocation API

xprtrdma needs to allocate the Call and Reply buffers separately.
TBH, the reliance on using a single buffer for the pair of XDR
buffers is transport implementation-specific.

Transports that want to allocate separate Call and Reply buffers
will ignore the "size" argument anyway.  Don't bother passing it.

The buf_alloc method can't return two pointers. Instead, make the
method's return value an error code, and set the rq_buffer pointer
in the method itself.

This gives call_allocate an opportunity to terminate an RPC instead
of looping forever when a permanent problem occurs. If a request is
just bogus, or the transport is in a state where it can't allocate
resources for any request, there needs to be a way to kill the RPC
right there and not loop.

This immediately fixes a rare problem in the backchannel send path,
which loops if the server happens to send a CB request whose
call+reply size is larger than a page (which it shouldn't do yet).

One more issue: looks like xprt_inject_disconnect was incorrectly
placed in the failure path in call_allocate. It needs to be in the
success path, as it is for other call-sites.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent b9c5bc03
...@@ -239,7 +239,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *, ...@@ -239,7 +239,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
void *); void *);
void rpc_wake_up_status(struct rpc_wait_queue *, int); void rpc_wake_up_status(struct rpc_wait_queue *, int);
void rpc_delay(struct rpc_task *, unsigned long); void rpc_delay(struct rpc_task *, unsigned long);
void * rpc_malloc(struct rpc_task *, size_t); int rpc_malloc(struct rpc_task *);
void rpc_free(void *); void rpc_free(void *);
int rpciod_up(void); int rpciod_up(void);
void rpciod_down(void); void rpciod_down(void);
......
...@@ -127,7 +127,7 @@ struct rpc_xprt_ops { ...@@ -127,7 +127,7 @@ struct rpc_xprt_ops {
void (*rpcbind)(struct rpc_task *task); void (*rpcbind)(struct rpc_task *task);
void (*set_port)(struct rpc_xprt *xprt, unsigned short port); void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task); void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
void * (*buf_alloc)(struct rpc_task *task, size_t size); int (*buf_alloc)(struct rpc_task *task);
void (*buf_free)(void *buffer); void (*buf_free)(void *buffer);
int (*send_request)(struct rpc_task *task); int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task); void (*set_retrans_timeout)(struct rpc_task *task);
......
...@@ -1691,6 +1691,7 @@ call_allocate(struct rpc_task *task) ...@@ -1691,6 +1691,7 @@ call_allocate(struct rpc_task *task)
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt; struct rpc_xprt *xprt = req->rq_xprt;
struct rpc_procinfo *proc = task->tk_msg.rpc_proc; struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
int status;
dprint_status(task); dprint_status(task);
...@@ -1716,11 +1717,14 @@ call_allocate(struct rpc_task *task) ...@@ -1716,11 +1717,14 @@ call_allocate(struct rpc_task *task)
req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen; req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
req->rq_rcvsize <<= 2; req->rq_rcvsize <<= 2;
req->rq_buffer = xprt->ops->buf_alloc(task, status = xprt->ops->buf_alloc(task);
req->rq_callsize + req->rq_rcvsize);
if (req->rq_buffer != NULL)
return;
xprt_inject_disconnect(xprt); xprt_inject_disconnect(xprt);
if (status == 0)
return;
if (status != -ENOMEM) {
rpc_exit(task, status);
return;
}
dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid); dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
......
...@@ -849,14 +849,17 @@ static void rpc_async_schedule(struct work_struct *work) ...@@ -849,14 +849,17 @@ static void rpc_async_schedule(struct work_struct *work)
} }
/** /**
* rpc_malloc - allocate an RPC buffer * rpc_malloc - allocate RPC buffer resources
* @task: RPC task that will use this buffer * @task: RPC task
* @size: requested byte size *
* A single memory region is allocated, which is split between the
* RPC call and RPC reply that this task is being used for. When
* this RPC is retired, the memory is released by calling rpc_free.
* *
* To prevent rpciod from hanging, this allocator never sleeps, * To prevent rpciod from hanging, this allocator never sleeps,
* returning NULL and suppressing warning if the request cannot be serviced * returning -ENOMEM and suppressing warning if the request cannot
* immediately. * be serviced immediately. The caller can arrange to sleep in a
* The caller can arrange to sleep in a way that is safe for rpciod. * way that is safe for rpciod.
* *
* Most requests are 'small' (under 2KiB) and can be serviced from a * Most requests are 'small' (under 2KiB) and can be serviced from a
* mempool, ensuring that NFS reads and writes can always proceed, * mempool, ensuring that NFS reads and writes can always proceed,
...@@ -865,8 +868,10 @@ static void rpc_async_schedule(struct work_struct *work) ...@@ -865,8 +868,10 @@ static void rpc_async_schedule(struct work_struct *work)
* In order to avoid memory starvation triggering more writebacks of * In order to avoid memory starvation triggering more writebacks of
* NFS requests, we avoid using GFP_KERNEL. * NFS requests, we avoid using GFP_KERNEL.
*/ */
void *rpc_malloc(struct rpc_task *task, size_t size) int rpc_malloc(struct rpc_task *task)
{ {
struct rpc_rqst *rqst = task->tk_rqstp;
size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
struct rpc_buffer *buf; struct rpc_buffer *buf;
gfp_t gfp = GFP_NOIO | __GFP_NOWARN; gfp_t gfp = GFP_NOIO | __GFP_NOWARN;
...@@ -880,12 +885,13 @@ void *rpc_malloc(struct rpc_task *task, size_t size) ...@@ -880,12 +885,13 @@ void *rpc_malloc(struct rpc_task *task, size_t size)
buf = kmalloc(size, gfp); buf = kmalloc(size, gfp);
if (!buf) if (!buf)
return NULL; return -ENOMEM;
buf->len = size; buf->len = size;
dprintk("RPC: %5u allocated buffer of size %zu at %p\n", dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
task->tk_pid, size, buf); task->tk_pid, size, buf);
return &buf->data; rqst->rq_buffer = buf->data;
return 0;
} }
EXPORT_SYMBOL_GPL(rpc_malloc); EXPORT_SYMBOL_GPL(rpc_malloc);
......
...@@ -159,29 +159,30 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, ...@@ -159,29 +159,30 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
/* Server-side transport endpoint wants a whole page for its send /* Server-side transport endpoint wants a whole page for its send
* buffer. The client RPC code constructs the RPC header in this * buffer. The client RPC code constructs the RPC header in this
* buffer before it invokes ->send_request. * buffer before it invokes ->send_request.
*
* Returns NULL if there was a temporary allocation failure.
*/ */
static void * static int
xprt_rdma_bc_allocate(struct rpc_task *task, size_t size) xprt_rdma_bc_allocate(struct rpc_task *task)
{ {
struct rpc_rqst *rqst = task->tk_rqstp; struct rpc_rqst *rqst = task->tk_rqstp;
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt; struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
size_t size = rqst->rq_callsize;
struct svcxprt_rdma *rdma; struct svcxprt_rdma *rdma;
struct page *page; struct page *page;
rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt); rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
/* Prevent an infinite loop: try to make this case work */ if (size > PAGE_SIZE) {
if (size > PAGE_SIZE)
WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n", WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
size); size);
return -EINVAL;
}
page = alloc_page(RPCRDMA_DEF_GFP); page = alloc_page(RPCRDMA_DEF_GFP);
if (!page) if (!page)
return NULL; return -ENOMEM;
return page_address(page); rqst->rq_buffer = page_address(page);
return 0;
} }
static void static void
......
...@@ -477,7 +477,15 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -477,7 +477,15 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
} }
} }
/* /**
* xprt_rdma_allocate - allocate transport resources for an RPC
* @task: RPC task
*
* Return values:
* 0: Success; rq_buffer points to RPC buffer to use
* ENOMEM: Out of memory, call again later
* EIO: A permanent error occurred, do not retry
*
* The RDMA allocate/free functions need the task structure as a place * The RDMA allocate/free functions need the task structure as a place
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
* sequence. * sequence.
...@@ -486,11 +494,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -486,11 +494,12 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
* (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer). * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
* We may register rq_rcv_buf when using reply chunks. * We may register rq_rcv_buf when using reply chunks.
*/ */
static void * static int
xprt_rdma_allocate(struct rpc_task *task, size_t size) xprt_rdma_allocate(struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; struct rpc_rqst *rqst = task->tk_rqstp;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_regbuf *rb; struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req; struct rpcrdma_req *req;
size_t min_size; size_t min_size;
...@@ -498,7 +507,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) ...@@ -498,7 +507,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
req = rpcrdma_buffer_get(&r_xprt->rx_buf); req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (req == NULL) if (req == NULL)
return NULL; return -ENOMEM;
flags = RPCRDMA_DEF_GFP; flags = RPCRDMA_DEF_GFP;
if (RPC_IS_SWAPPER(task)) if (RPC_IS_SWAPPER(task))
...@@ -515,7 +524,8 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) ...@@ -515,7 +524,8 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */ req->rl_connect_cookie = 0; /* our reserved value */
req->rl_task = task; req->rl_task = task;
return req->rl_sendbuf->rg_base; rqst->rq_buffer = req->rl_sendbuf->rg_base;
return 0;
out_rdmabuf: out_rdmabuf:
min_size = r_xprt->rx_data.inline_wsize; min_size = r_xprt->rx_data.inline_wsize;
...@@ -558,7 +568,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) ...@@ -558,7 +568,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
out_fail: out_fail:
rpcrdma_buffer_put(req); rpcrdma_buffer_put(req);
return NULL; return -ENOMEM;
} }
/* /*
......
...@@ -2533,23 +2533,28 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) ...@@ -2533,23 +2533,28 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
* we allocate pages instead doing a kmalloc like rpc_malloc is because we want * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
* to use the server side send routines. * to use the server side send routines.
*/ */
static void *bc_malloc(struct rpc_task *task, size_t size) static int bc_malloc(struct rpc_task *task)
{ {
struct rpc_rqst *rqst = task->tk_rqstp;
size_t size = rqst->rq_callsize;
struct page *page; struct page *page;
struct rpc_buffer *buf; struct rpc_buffer *buf;
WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer)); if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) {
if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n",
return NULL; size);
return -EINVAL;
}
page = alloc_page(GFP_KERNEL); page = alloc_page(GFP_KERNEL);
if (!page) if (!page)
return NULL; return -ENOMEM;
buf = page_address(page); buf = page_address(page);
buf->len = PAGE_SIZE; buf->len = PAGE_SIZE;
return buf->data; rqst->rq_buffer = buf->data;
return 0;
} }
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment