Commit 762e4e67 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Refactor RPC call encoding

Move the call encoding so that it occurs before the transport connection
etc.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 944b0429
...@@ -347,6 +347,7 @@ bool xprt_prepare_transmit(struct rpc_task *task); ...@@ -347,6 +347,7 @@ bool xprt_prepare_transmit(struct rpc_task *task);
void xprt_request_enqueue_transmit(struct rpc_task *task); void xprt_request_enqueue_transmit(struct rpc_task *task);
void xprt_request_enqueue_receive(struct rpc_task *task); void xprt_request_enqueue_receive(struct rpc_task *task);
void xprt_request_wait_receive(struct rpc_task *task); void xprt_request_wait_receive(struct rpc_task *task);
bool xprt_request_need_retransmit(struct rpc_task *task);
void xprt_transmit(struct rpc_task *task); void xprt_transmit(struct rpc_task *task);
void xprt_end_transmit(struct rpc_task *task); void xprt_end_transmit(struct rpc_task *task);
int xprt_adjust_timeout(struct rpc_rqst *req); int xprt_adjust_timeout(struct rpc_rqst *req);
......
...@@ -61,6 +61,7 @@ static void call_start(struct rpc_task *task); ...@@ -61,6 +61,7 @@ static void call_start(struct rpc_task *task);
static void call_reserve(struct rpc_task *task); static void call_reserve(struct rpc_task *task);
static void call_reserveresult(struct rpc_task *task); static void call_reserveresult(struct rpc_task *task);
static void call_allocate(struct rpc_task *task); static void call_allocate(struct rpc_task *task);
static void call_encode(struct rpc_task *task);
static void call_decode(struct rpc_task *task); static void call_decode(struct rpc_task *task);
static void call_bind(struct rpc_task *task); static void call_bind(struct rpc_task *task);
static void call_bind_status(struct rpc_task *task); static void call_bind_status(struct rpc_task *task);
...@@ -1140,7 +1141,8 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req) ...@@ -1140,7 +1141,8 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
struct xdr_buf *xbufp = &req->rq_snd_buf; struct xdr_buf *xbufp = &req->rq_snd_buf;
struct rpc_task_setup task_setup_data = { struct rpc_task_setup task_setup_data = {
.callback_ops = &rpc_default_ops, .callback_ops = &rpc_default_ops,
.flags = RPC_TASK_SOFTCONN, .flags = RPC_TASK_SOFTCONN |
RPC_TASK_NO_RETRANS_TIMEOUT,
}; };
dprintk("RPC: rpc_run_bc_task req= %p\n", req); dprintk("RPC: rpc_run_bc_task req= %p\n", req);
...@@ -1160,7 +1162,6 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req) ...@@ -1160,7 +1162,6 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
task->tk_action = call_bc_transmit; task->tk_action = call_bc_transmit;
atomic_inc(&task->tk_count); atomic_inc(&task->tk_count);
WARN_ON_ONCE(atomic_read(&task->tk_count) != 2); WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
xprt_request_enqueue_transmit(task);
rpc_execute(task); rpc_execute(task);
dprintk("RPC: rpc_run_bc_task: task= %p\n", task); dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
...@@ -1680,7 +1681,7 @@ call_allocate(struct rpc_task *task) ...@@ -1680,7 +1681,7 @@ call_allocate(struct rpc_task *task)
dprint_status(task); dprint_status(task);
task->tk_status = 0; task->tk_status = 0;
task->tk_action = call_bind; task->tk_action = call_encode;
if (req->rq_buffer) if (req->rq_buffer)
return; return;
...@@ -1724,12 +1725,12 @@ call_allocate(struct rpc_task *task) ...@@ -1724,12 +1725,12 @@ call_allocate(struct rpc_task *task)
static int static int
rpc_task_need_encode(struct rpc_task *task) rpc_task_need_encode(struct rpc_task *task)
{ {
return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0; return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
(!(task->tk_flags & RPC_TASK_SENT) ||
!(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
xprt_request_need_retransmit(task));
} }
/*
* 3. Encode arguments of an RPC call
*/
static void static void
rpc_xdr_encode(struct rpc_task *task) rpc_xdr_encode(struct rpc_task *task)
{ {
...@@ -1745,6 +1746,7 @@ rpc_xdr_encode(struct rpc_task *task) ...@@ -1745,6 +1746,7 @@ rpc_xdr_encode(struct rpc_task *task)
xdr_buf_init(&req->rq_rcv_buf, xdr_buf_init(&req->rq_rcv_buf,
req->rq_rbuffer, req->rq_rbuffer,
req->rq_rcvsize); req->rq_rcvsize);
req->rq_bytes_sent = 0;
p = rpc_encode_header(task); p = rpc_encode_header(task);
if (p == NULL) { if (p == NULL) {
...@@ -1761,6 +1763,34 @@ rpc_xdr_encode(struct rpc_task *task) ...@@ -1761,6 +1763,34 @@ rpc_xdr_encode(struct rpc_task *task)
task->tk_msg.rpc_argp); task->tk_msg.rpc_argp);
} }
/*
* 3. Encode arguments of an RPC call
*/
static void
call_encode(struct rpc_task *task)
{
if (!rpc_task_need_encode(task))
goto out;
/* Encode here so that rpcsec_gss can use correct sequence number. */
rpc_xdr_encode(task);
/* Did the encode result in an error condition? */
if (task->tk_status != 0) {
/* Was the error nonfatal? */
if (task->tk_status == -EAGAIN)
rpc_delay(task, HZ >> 4);
else
rpc_exit(task, task->tk_status);
return;
}
/* Add task to reply queue before transmission to avoid races */
if (rpc_reply_expected(task))
xprt_request_enqueue_receive(task);
xprt_request_enqueue_transmit(task);
out:
task->tk_action = call_bind;
}
/* /*
* 4. Get the server port number if not yet set * 4. Get the server port number if not yet set
*/ */
...@@ -1945,24 +1975,8 @@ call_transmit(struct rpc_task *task) ...@@ -1945,24 +1975,8 @@ call_transmit(struct rpc_task *task)
dprint_status(task); dprint_status(task);
task->tk_action = call_transmit_status; task->tk_action = call_transmit_status;
/* Encode here so that rpcsec_gss can use correct sequence number. */ if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
if (rpc_task_need_encode(task)) {
rpc_xdr_encode(task);
/* Did the encode result in an error condition? */
if (task->tk_status != 0) {
/* Was the error nonfatal? */
if (task->tk_status == -EAGAIN)
rpc_delay(task, HZ >> 4);
else
rpc_exit(task, task->tk_status);
return; return;
}
}
/* Add task to reply queue before transmission to avoid races */
if (rpc_reply_expected(task))
xprt_request_enqueue_receive(task);
xprt_request_enqueue_transmit(task);
if (!xprt_prepare_transmit(task)) if (!xprt_prepare_transmit(task))
return; return;
...@@ -1997,9 +2011,9 @@ call_transmit_status(struct rpc_task *task) ...@@ -1997,9 +2011,9 @@ call_transmit_status(struct rpc_task *task)
xprt_end_transmit(task); xprt_end_transmit(task);
break; break;
case -EBADMSG: case -EBADMSG:
task->tk_action = call_transmit;
task->tk_status = 0;
xprt_end_transmit(task); xprt_end_transmit(task);
task->tk_status = 0;
task->tk_action = call_encode;
break; break;
/* /*
* Special cases: if we've been waiting on the * Special cases: if we've been waiting on the
...@@ -2048,6 +2062,9 @@ call_bc_transmit(struct rpc_task *task) ...@@ -2048,6 +2062,9 @@ call_bc_transmit(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
if (rpc_task_need_encode(task))
xprt_request_enqueue_transmit(task);
if (!xprt_prepare_transmit(task)) if (!xprt_prepare_transmit(task))
goto out_retry; goto out_retry;
...@@ -2169,7 +2186,7 @@ call_status(struct rpc_task *task) ...@@ -2169,7 +2186,7 @@ call_status(struct rpc_task *task)
case -EPIPE: case -EPIPE:
case -ENOTCONN: case -ENOTCONN:
case -EAGAIN: case -EAGAIN:
task->tk_action = call_bind; task->tk_action = call_encode;
break; break;
case -EIO: case -EIO:
/* shutdown or soft timeout */ /* shutdown or soft timeout */
...@@ -2234,7 +2251,7 @@ call_timeout(struct rpc_task *task) ...@@ -2234,7 +2251,7 @@ call_timeout(struct rpc_task *task)
rpcauth_invalcred(task); rpcauth_invalcred(task);
retry: retry:
task->tk_action = call_bind; task->tk_action = call_encode;
task->tk_status = 0; task->tk_status = 0;
} }
...@@ -2278,7 +2295,7 @@ call_decode(struct rpc_task *task) ...@@ -2278,7 +2295,7 @@ call_decode(struct rpc_task *task)
if (req->rq_rcv_buf.len < 12) { if (req->rq_rcv_buf.len < 12) {
if (!RPC_IS_SOFT(task)) { if (!RPC_IS_SOFT(task)) {
task->tk_action = call_bind; task->tk_action = call_encode;
goto out_retry; goto out_retry;
} }
dprintk("RPC: %s: too small RPC reply size (%d bytes)\n", dprintk("RPC: %s: too small RPC reply size (%d bytes)\n",
...@@ -2409,7 +2426,7 @@ rpc_verify_header(struct rpc_task *task) ...@@ -2409,7 +2426,7 @@ rpc_verify_header(struct rpc_task *task)
task->tk_garb_retry--; task->tk_garb_retry--;
dprintk("RPC: %5u %s: retry garbled creds\n", dprintk("RPC: %5u %s: retry garbled creds\n",
task->tk_pid, __func__); task->tk_pid, __func__);
task->tk_action = call_bind; task->tk_action = call_encode;
goto out_retry; goto out_retry;
case RPC_AUTH_TOOWEAK: case RPC_AUTH_TOOWEAK:
printk(KERN_NOTICE "RPC: server %s requires stronger " printk(KERN_NOTICE "RPC: server %s requires stronger "
...@@ -2478,7 +2495,7 @@ rpc_verify_header(struct rpc_task *task) ...@@ -2478,7 +2495,7 @@ rpc_verify_header(struct rpc_task *task)
task->tk_garb_retry--; task->tk_garb_retry--;
dprintk("RPC: %5u %s: retrying\n", dprintk("RPC: %5u %s: retrying\n",
task->tk_pid, __func__); task->tk_pid, __func__);
task->tk_action = call_bind; task->tk_action = call_encode;
out_retry: out_retry:
return ERR_PTR(-EAGAIN); return ERR_PTR(-EAGAIN);
} }
......
...@@ -1058,18 +1058,10 @@ void xprt_request_wait_receive(struct rpc_task *task) ...@@ -1058,18 +1058,10 @@ void xprt_request_wait_receive(struct rpc_task *task)
spin_unlock(&xprt->queue_lock); spin_unlock(&xprt->queue_lock);
} }
static bool
xprt_request_need_transmit(struct rpc_task *task)
{
return !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
xprt_request_retransmit_after_disconnect(task);
}
static bool static bool
xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
{ {
return xprt_request_need_transmit(task) && return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
} }
/** /**
...@@ -1124,6 +1116,18 @@ xprt_request_dequeue_transmit(struct rpc_task *task) ...@@ -1124,6 +1116,18 @@ xprt_request_dequeue_transmit(struct rpc_task *task)
spin_unlock(&xprt->queue_lock); spin_unlock(&xprt->queue_lock);
} }
/**
* xprt_request_need_retransmit - Test if a task needs retransmission
* @task: pointer to rpc_task
*
* Test for whether a connection breakage requires the task to retransmit
*/
bool
xprt_request_need_retransmit(struct rpc_task *task)
{
return xprt_request_retransmit_after_disconnect(task);
}
/** /**
* xprt_prepare_transmit - reserve the transport before sending a request * xprt_prepare_transmit - reserve the transport before sending a request
* @task: RPC task about to send a request * @task: RPC task about to send a request
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment