Commit e86be3a0 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: More fixes for backlog congestion

Ensure that we fix the XPRT_CONGESTED starvation issue for RDMA as well
as socket based transports.
Ensure we always initialise the request after waking up from the backlog
list.

Fixes: e877a88d ("SUNRPC in case of backlog, hand free slots directly to waiting task")
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent d275880a
...@@ -368,6 +368,8 @@ struct rpc_xprt * xprt_alloc(struct net *net, size_t size, ...@@ -368,6 +368,8 @@ struct rpc_xprt * xprt_alloc(struct net *net, size_t size,
unsigned int num_prealloc, unsigned int num_prealloc,
unsigned int max_req); unsigned int max_req);
void xprt_free(struct rpc_xprt *); void xprt_free(struct rpc_xprt *);
void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task);
bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req);
static inline int static inline int
xprt_enable_swap(struct rpc_xprt *xprt) xprt_enable_swap(struct rpc_xprt *xprt)
......
...@@ -1607,11 +1607,18 @@ xprt_transmit(struct rpc_task *task) ...@@ -1607,11 +1607,18 @@ xprt_transmit(struct rpc_task *task)
spin_unlock(&xprt->queue_lock); spin_unlock(&xprt->queue_lock);
} }
static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) static void xprt_complete_request_init(struct rpc_task *task)
{
if (task->tk_rqstp)
xprt_request_init(task);
}
void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
set_bit(XPRT_CONGESTED, &xprt->state); set_bit(XPRT_CONGESTED, &xprt->state);
rpc_sleep_on(&xprt->backlog, task, NULL); rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
} }
EXPORT_SYMBOL_GPL(xprt_add_backlog);
static bool __xprt_set_rq(struct rpc_task *task, void *data) static bool __xprt_set_rq(struct rpc_task *task, void *data)
{ {
...@@ -1619,14 +1626,13 @@ static bool __xprt_set_rq(struct rpc_task *task, void *data) ...@@ -1619,14 +1626,13 @@ static bool __xprt_set_rq(struct rpc_task *task, void *data)
if (task->tk_rqstp == NULL) { if (task->tk_rqstp == NULL) {
memset(req, 0, sizeof(*req)); /* mark unused */ memset(req, 0, sizeof(*req)); /* mark unused */
task->tk_status = -EAGAIN;
task->tk_rqstp = req; task->tk_rqstp = req;
return true; return true;
} }
return false; return false;
} }
static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
{ {
if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
clear_bit(XPRT_CONGESTED, &xprt->state); clear_bit(XPRT_CONGESTED, &xprt->state);
...@@ -1634,6 +1640,7 @@ static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) ...@@ -1634,6 +1640,7 @@ static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
} }
return true; return true;
} }
EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
...@@ -1643,7 +1650,7 @@ static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task ...@@ -1643,7 +1650,7 @@ static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task
goto out; goto out;
spin_lock(&xprt->reserve_lock); spin_lock(&xprt->reserve_lock);
if (test_bit(XPRT_CONGESTED, &xprt->state)) { if (test_bit(XPRT_CONGESTED, &xprt->state)) {
rpc_sleep_on(&xprt->backlog, task, NULL); xprt_add_backlog(xprt, task);
ret = true; ret = true;
} }
spin_unlock(&xprt->reserve_lock); spin_unlock(&xprt->reserve_lock);
...@@ -1812,10 +1819,6 @@ xprt_request_init(struct rpc_task *task) ...@@ -1812,10 +1819,6 @@ xprt_request_init(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
if (req->rq_task)
/* Already initialized */
return;
req->rq_task = task; req->rq_task = task;
req->rq_xprt = xprt; req->rq_xprt = xprt;
req->rq_buffer = NULL; req->rq_buffer = NULL;
...@@ -1876,10 +1879,8 @@ void xprt_retry_reserve(struct rpc_task *task) ...@@ -1876,10 +1879,8 @@ void xprt_retry_reserve(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0; task->tk_status = 0;
if (task->tk_rqstp != NULL) { if (task->tk_rqstp != NULL)
xprt_request_init(task);
return; return;
}
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
xprt_do_reserve(xprt, task); xprt_do_reserve(xprt, task);
...@@ -1904,7 +1905,6 @@ void xprt_release(struct rpc_task *task) ...@@ -1904,7 +1905,6 @@ void xprt_release(struct rpc_task *task)
} }
xprt = req->rq_xprt; xprt = req->rq_xprt;
if (xprt) {
xprt_request_dequeue_xprt(task); xprt_request_dequeue_xprt(task);
spin_lock(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task); xprt->ops->release_xprt(xprt, task);
...@@ -1920,8 +1920,6 @@ void xprt_release(struct rpc_task *task) ...@@ -1920,8 +1920,6 @@ void xprt_release(struct rpc_task *task)
put_rpccred(req->rq_cred); put_rpccred(req->rq_cred);
if (req->rq_release_snd_buf) if (req->rq_release_snd_buf)
req->rq_release_snd_buf(req); req->rq_release_snd_buf(req);
} else
xprt = task->tk_xprt;
task->tk_rqstp = NULL; task->tk_rqstp = NULL;
if (likely(!bc_prealloc(req))) if (likely(!bc_prealloc(req)))
......
...@@ -520,9 +520,8 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -520,9 +520,8 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
return; return;
out_sleep: out_sleep:
set_bit(XPRT_CONGESTED, &xprt->state);
rpc_sleep_on(&xprt->backlog, task, NULL);
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
xprt_add_backlog(xprt, task);
} }
/** /**
...@@ -537,10 +536,11 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst) ...@@ -537,10 +536,11 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
struct rpcrdma_xprt *r_xprt = struct rpcrdma_xprt *r_xprt =
container_of(xprt, struct rpcrdma_xprt, rx_xprt); container_of(xprt, struct rpcrdma_xprt, rx_xprt);
rpcrdma_reply_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
if (!xprt_wake_up_backlog(xprt, rqst)) {
memset(rqst, 0, sizeof(*rqst)); memset(rqst, 0, sizeof(*rqst));
rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst)); rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
if (unlikely(!rpc_wake_up_next(&xprt->backlog))) }
clear_bit(XPRT_CONGESTED, &xprt->state);
} }
static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt, static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
......
...@@ -1200,6 +1200,20 @@ rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) ...@@ -1200,6 +1200,20 @@ rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
return mr; return mr;
} }
/**
* rpcrdma_reply_put - Put reply buffers back into pool
* @buffers: buffer pool
* @req: object to return
*
*/
void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
if (req->rl_reply) {
rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
}
}
/** /**
* rpcrdma_buffer_get - Get a request buffer * rpcrdma_buffer_get - Get a request buffer
* @buffers: Buffer pool from which to obtain a buffer * @buffers: Buffer pool from which to obtain a buffer
...@@ -1228,9 +1242,7 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) ...@@ -1228,9 +1242,7 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
*/ */
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{ {
if (req->rl_reply) rpcrdma_reply_put(buffers, req);
rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
spin_lock(&buffers->rb_lock); spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs); list_add(&req->rl_list, &buffers->rb_send_bufs);
......
...@@ -479,6 +479,7 @@ struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); ...@@ -479,6 +479,7 @@ struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
struct rpcrdma_req *req); struct rpcrdma_req *req);
void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep); void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep);
void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req);
bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
gfp_t flags); gfp_t flags);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment