Commit e877a88d authored by NeilBrown's avatar NeilBrown Committed by Trond Myklebust

SUNRPC in case of backlog, hand free slots directly to waiting task

If sunrpc.tcp_max_slot_table_entries is small and there are tasks
on the backlog queue, then when a request completes it is freed and the
first task on the queue is woken.  The expectation is that it will wake
and claim that request.  However if it was a sync task and the waiting
process was killed at just that moment, it will wake and NOT claim the
request.

As long as TASK_CONGESTED remains set, requests can only be claimed by
tasks woken from the backlog, and they are woken only as requests are
freed, so when a task doesn't claim a request, no other task can ever
get that request until TASK_CONGESTED is cleared.  Each time this
happens the number of available requests is decreased by one.

With a sufficiently high workload and sufficiently low setting of
max_slot (16 in the case where this was seen), TASK_CONGESTED can remain
set for an extended period, and the above scenario (of a process being
killed just as its task was woken) can repeat until no requests can be
allocated.  Then traffic stops.

This patch addresses the problem by introducing a positive handover of a
request from a completing task to a backlog task - the request is never
freed when there is a backlog.

When a task is woken it might not already have a request attached in
which case it is *not* freed (as with current code) but is initialised
(if needed) and used.  If it isn't used it will eventually be freed by
rpc_exit_task().  xprt_release() is enhanced to be able to correctly
release an uninitialised request.

Fixes: ba60eb25 ("SUNRPC: Fix a livelock problem in the xprt->backlog queue")
Signed-off-by: default avatarNeilBrown <neilb@suse.de>
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent d1d97395
...@@ -1677,13 +1677,6 @@ call_reserveresult(struct rpc_task *task) ...@@ -1677,13 +1677,6 @@ call_reserveresult(struct rpc_task *task)
return; return;
} }
/*
* Even though there was an error, we may have acquired
* a request slot somehow. Make sure not to leak it.
*/
if (task->tk_rqstp)
xprt_release(task);
switch (status) { switch (status) {
case -ENOMEM: case -ENOMEM:
rpc_delay(task, HZ >> 2); rpc_delay(task, HZ >> 2);
......
...@@ -70,6 +70,7 @@ ...@@ -70,6 +70,7 @@
static void xprt_init(struct rpc_xprt *xprt, struct net *net); static void xprt_init(struct rpc_xprt *xprt, struct net *net);
static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); static __be32 xprt_alloc_xid(struct rpc_xprt *xprt);
static void xprt_destroy(struct rpc_xprt *xprt); static void xprt_destroy(struct rpc_xprt *xprt);
static void xprt_request_init(struct rpc_task *task);
static DEFINE_SPINLOCK(xprt_list_lock); static DEFINE_SPINLOCK(xprt_list_lock);
static LIST_HEAD(xprt_list); static LIST_HEAD(xprt_list);
...@@ -1612,10 +1613,26 @@ static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -1612,10 +1613,26 @@ static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
rpc_sleep_on(&xprt->backlog, task, NULL); rpc_sleep_on(&xprt->backlog, task, NULL);
} }
static void xprt_wake_up_backlog(struct rpc_xprt *xprt) static bool __xprt_set_rq(struct rpc_task *task, void *data)
{ {
if (rpc_wake_up_next(&xprt->backlog) == NULL) struct rpc_rqst *req = data;
if (task->tk_rqstp == NULL) {
memset(req, 0, sizeof(*req)); /* mark unused */
task->tk_status = -EAGAIN;
task->tk_rqstp = req;
return true;
}
return false;
}
static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
clear_bit(XPRT_CONGESTED, &xprt->state); clear_bit(XPRT_CONGESTED, &xprt->state);
return false;
}
return true;
} }
static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
...@@ -1703,11 +1720,11 @@ EXPORT_SYMBOL_GPL(xprt_alloc_slot); ...@@ -1703,11 +1720,11 @@ EXPORT_SYMBOL_GPL(xprt_alloc_slot);
void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
{ {
spin_lock(&xprt->reserve_lock); spin_lock(&xprt->reserve_lock);
if (!xprt_dynamic_free_slot(xprt, req)) { if (!xprt_wake_up_backlog(xprt, req) &&
!xprt_dynamic_free_slot(xprt, req)) {
memset(req, 0, sizeof(*req)); /* mark unused */ memset(req, 0, sizeof(*req)); /* mark unused */
list_add(&req->rq_list, &xprt->free); list_add(&req->rq_list, &xprt->free);
} }
xprt_wake_up_backlog(xprt);
spin_unlock(&xprt->reserve_lock); spin_unlock(&xprt->reserve_lock);
} }
EXPORT_SYMBOL_GPL(xprt_free_slot); EXPORT_SYMBOL_GPL(xprt_free_slot);
...@@ -1795,6 +1812,10 @@ xprt_request_init(struct rpc_task *task) ...@@ -1795,6 +1812,10 @@ xprt_request_init(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
if (req->rq_task)
/* Already initialized */
return;
req->rq_task = task; req->rq_task = task;
req->rq_xprt = xprt; req->rq_xprt = xprt;
req->rq_buffer = NULL; req->rq_buffer = NULL;
...@@ -1855,8 +1876,10 @@ void xprt_retry_reserve(struct rpc_task *task) ...@@ -1855,8 +1876,10 @@ void xprt_retry_reserve(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0; task->tk_status = 0;
if (task->tk_rqstp != NULL) if (task->tk_rqstp != NULL) {
xprt_request_init(task);
return; return;
}
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
xprt_do_reserve(xprt, task); xprt_do_reserve(xprt, task);
...@@ -1881,6 +1904,7 @@ void xprt_release(struct rpc_task *task) ...@@ -1881,6 +1904,7 @@ void xprt_release(struct rpc_task *task)
} }
xprt = req->rq_xprt; xprt = req->rq_xprt;
if (xprt) {
xprt_request_dequeue_xprt(task); xprt_request_dequeue_xprt(task);
spin_lock(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task); xprt->ops->release_xprt(xprt, task);
...@@ -1894,10 +1918,12 @@ void xprt_release(struct rpc_task *task) ...@@ -1894,10 +1918,12 @@ void xprt_release(struct rpc_task *task)
xdr_free_bvec(&req->rq_snd_buf); xdr_free_bvec(&req->rq_snd_buf);
if (req->rq_cred != NULL) if (req->rq_cred != NULL)
put_rpccred(req->rq_cred); put_rpccred(req->rq_cred);
task->tk_rqstp = NULL;
if (req->rq_release_snd_buf) if (req->rq_release_snd_buf)
req->rq_release_snd_buf(req); req->rq_release_snd_buf(req);
} else
xprt = task->tk_xprt;
task->tk_rqstp = NULL;
if (likely(!bc_prealloc(req))) if (likely(!bc_prealloc(req)))
xprt->ops->free_slot(xprt, req); xprt->ops->free_slot(xprt, req);
else else
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment