Commit 7ebbbc6e authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Simplify identification of when the message send/receive is complete

Add states to indicate that the message send and receive are not yet
complete.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 3021a5bb
...@@ -140,8 +140,10 @@ struct rpc_task_setup { ...@@ -140,8 +140,10 @@ struct rpc_task_setup {
#define RPC_TASK_RUNNING 0 #define RPC_TASK_RUNNING 0
#define RPC_TASK_QUEUED 1 #define RPC_TASK_QUEUED 1
#define RPC_TASK_ACTIVE 2 #define RPC_TASK_ACTIVE 2
#define RPC_TASK_MSG_RECV 3 #define RPC_TASK_NEED_XMIT 3
#define RPC_TASK_MSG_RECV_WAIT 4 #define RPC_TASK_NEED_RECV 4
#define RPC_TASK_MSG_RECV 5
#define RPC_TASK_MSG_RECV_WAIT 6
#define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate) #define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
#define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate) #define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
......
...@@ -1156,6 +1156,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req) ...@@ -1156,6 +1156,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
*/ */
xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
xbufp->tail[0].iov_len; xbufp->tail[0].iov_len;
set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_action = call_bc_transmit; task->tk_action = call_bc_transmit;
atomic_inc(&task->tk_count); atomic_inc(&task->tk_count);
...@@ -1720,17 +1721,10 @@ call_allocate(struct rpc_task *task) ...@@ -1720,17 +1721,10 @@ call_allocate(struct rpc_task *task)
rpc_exit(task, -ERESTARTSYS); rpc_exit(task, -ERESTARTSYS);
} }
static inline int static int
rpc_task_need_encode(struct rpc_task *task) rpc_task_need_encode(struct rpc_task *task)
{ {
return task->tk_rqstp->rq_snd_buf.len == 0; return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0;
}
static inline void
rpc_task_force_reencode(struct rpc_task *task)
{
task->tk_rqstp->rq_snd_buf.len = 0;
task->tk_rqstp->rq_bytes_sent = 0;
} }
/* /*
...@@ -1765,6 +1759,8 @@ rpc_xdr_encode(struct rpc_task *task) ...@@ -1765,6 +1759,8 @@ rpc_xdr_encode(struct rpc_task *task)
task->tk_status = rpcauth_wrap_req(task, encode, req, p, task->tk_status = rpcauth_wrap_req(task, encode, req, p,
task->tk_msg.rpc_argp); task->tk_msg.rpc_argp);
if (task->tk_status == 0)
set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
} }
/* /*
...@@ -1999,7 +1995,6 @@ call_transmit_status(struct rpc_task *task) ...@@ -1999,7 +1995,6 @@ call_transmit_status(struct rpc_task *task)
*/ */
if (task->tk_status == 0) { if (task->tk_status == 0) {
xprt_end_transmit(task); xprt_end_transmit(task);
rpc_task_force_reencode(task);
return; return;
} }
...@@ -2010,7 +2005,6 @@ call_transmit_status(struct rpc_task *task) ...@@ -2010,7 +2005,6 @@ call_transmit_status(struct rpc_task *task)
default: default:
dprint_status(task); dprint_status(task);
xprt_end_transmit(task); xprt_end_transmit(task);
rpc_task_force_reencode(task);
break; break;
/* /*
* Special cases: if we've been waiting on the * Special cases: if we've been waiting on the
...@@ -2038,7 +2032,7 @@ call_transmit_status(struct rpc_task *task) ...@@ -2038,7 +2032,7 @@ call_transmit_status(struct rpc_task *task)
case -EADDRINUSE: case -EADDRINUSE:
case -ENOTCONN: case -ENOTCONN:
case -EPIPE: case -EPIPE:
rpc_task_force_reencode(task); break;
} }
} }
...@@ -2185,6 +2179,7 @@ call_status(struct rpc_task *task) ...@@ -2185,6 +2179,7 @@ call_status(struct rpc_task *task)
rpc_exit(task, status); rpc_exit(task, status);
break; break;
case -EBADMSG: case -EBADMSG:
clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_action = call_transmit; task->tk_action = call_transmit;
break; break;
default: default:
......
...@@ -936,10 +936,18 @@ void xprt_complete_rqst(struct rpc_task *task, int copied) ...@@ -936,10 +936,18 @@ void xprt_complete_rqst(struct rpc_task *task, int copied)
/* req->rq_reply_bytes_recvd */ /* req->rq_reply_bytes_recvd */
smp_wmb(); smp_wmb();
req->rq_reply_bytes_recvd = copied; req->rq_reply_bytes_recvd = copied;
clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
rpc_wake_up_queued_task(&xprt->pending, task); rpc_wake_up_queued_task(&xprt->pending, task);
} }
EXPORT_SYMBOL_GPL(xprt_complete_rqst); EXPORT_SYMBOL_GPL(xprt_complete_rqst);
static bool
xprt_request_data_received(struct rpc_task *task)
{
return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
task->tk_rqstp->rq_reply_bytes_recvd != 0;
}
static void xprt_timer(struct rpc_task *task) static void xprt_timer(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
...@@ -1031,12 +1039,13 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1031,12 +1039,13 @@ void xprt_transmit(struct rpc_task *task)
/* Add request to the receive list */ /* Add request to the receive list */
spin_lock(&xprt->recv_lock); spin_lock(&xprt->recv_lock);
list_add_tail(&req->rq_list, &xprt->recv); list_add_tail(&req->rq_list, &xprt->recv);
set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
spin_unlock(&xprt->recv_lock); spin_unlock(&xprt->recv_lock);
xprt_reset_majortimeo(req); xprt_reset_majortimeo(req);
/* Turn off autodisconnect */ /* Turn off autodisconnect */
del_singleshot_timer_sync(&xprt->timer); del_singleshot_timer_sync(&xprt->timer);
} }
} else if (!req->rq_bytes_sent) } else if (xprt_request_data_received(task) && !req->rq_bytes_sent)
return; return;
connect_cookie = xprt->connect_cookie; connect_cookie = xprt->connect_cookie;
...@@ -1046,9 +1055,11 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1046,9 +1055,11 @@ void xprt_transmit(struct rpc_task *task)
task->tk_status = status; task->tk_status = status;
return; return;
} }
xprt_inject_disconnect(xprt); xprt_inject_disconnect(xprt);
dprintk("RPC: %5u xmit complete\n", task->tk_pid); dprintk("RPC: %5u xmit complete\n", task->tk_pid);
clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_flags |= RPC_TASK_SENT; task->tk_flags |= RPC_TASK_SENT;
spin_lock_bh(&xprt->transport_lock); spin_lock_bh(&xprt->transport_lock);
...@@ -1062,14 +1073,14 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1062,14 +1073,14 @@ void xprt_transmit(struct rpc_task *task)
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
req->rq_connect_cookie = connect_cookie; req->rq_connect_cookie = connect_cookie;
if (rpc_reply_expected(task) && !READ_ONCE(req->rq_reply_bytes_recvd)) { if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
/* /*
* Sleep on the pending queue if we're expecting a reply. * Sleep on the pending queue if we're expecting a reply.
* The spinlock ensures atomicity between the test of * The spinlock ensures atomicity between the test of
* req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
*/ */
spin_lock(&xprt->recv_lock); spin_lock(&xprt->recv_lock);
if (!req->rq_reply_bytes_recvd) { if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
rpc_sleep_on(&xprt->pending, task, xprt_timer); rpc_sleep_on(&xprt->pending, task, xprt_timer);
/* /*
* Send an extra queue wakeup call if the * Send an extra queue wakeup call if the
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment