Commit 89f90fe1 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Allow calls to xprt_transmit() to drain the entire transmit queue

Rather than forcing each and every RPC task to grab the socket write
lock in order to send itself, we allow whichever task is holding the
write lock to attempt to drain the entire transmit queue.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 86aeee0e
...@@ -1224,15 +1224,20 @@ void xprt_end_transmit(struct rpc_task *task) ...@@ -1224,15 +1224,20 @@ void xprt_end_transmit(struct rpc_task *task)
} }
/** /**
* xprt_transmit - send an RPC request on a transport * xprt_request_transmit - send an RPC request on a transport
* @task: controlling RPC task * @req: pointer to request to transmit
* @snd_task: RPC task that owns the transport lock
* *
* We have to copy the iovec because sendmsg fiddles with its contents. * This performs the transmission of a single request.
* Note that if the request is not the same as snd_task, then it
* does need to be pinned.
* Returns '0' on success.
*/ */
void xprt_transmit(struct rpc_task *task) static int
xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
{ {
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt; struct rpc_xprt *xprt = req->rq_xprt;
struct rpc_task *task = req->rq_task;
unsigned int connect_cookie; unsigned int connect_cookie;
int is_retrans = RPC_WAS_SENT(task); int is_retrans = RPC_WAS_SENT(task);
int status; int status;
...@@ -1240,11 +1245,13 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1240,11 +1245,13 @@ void xprt_transmit(struct rpc_task *task)
dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
if (!req->rq_bytes_sent) { if (!req->rq_bytes_sent) {
if (xprt_request_data_received(task)) if (xprt_request_data_received(task)) {
status = 0;
goto out_dequeue; goto out_dequeue;
}
/* Verify that our message lies in the RPCSEC_GSS window */ /* Verify that our message lies in the RPCSEC_GSS window */
if (rpcauth_xmit_need_reencode(task)) { if (rpcauth_xmit_need_reencode(task)) {
task->tk_status = -EBADMSG; status = -EBADMSG;
goto out_dequeue; goto out_dequeue;
} }
} }
...@@ -1257,12 +1264,11 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1257,12 +1264,11 @@ void xprt_transmit(struct rpc_task *task)
req->rq_ntrans++; req->rq_ntrans++;
connect_cookie = xprt->connect_cookie; connect_cookie = xprt->connect_cookie;
status = xprt->ops->send_request(req, task); status = xprt->ops->send_request(req, snd_task);
trace_xprt_transmit(xprt, req->rq_xid, status); trace_xprt_transmit(xprt, req->rq_xid, status);
if (status != 0) { if (status != 0) {
req->rq_ntrans--; req->rq_ntrans--;
task->tk_status = status; return status;
return;
} }
if (is_retrans) if (is_retrans)
...@@ -1284,6 +1290,49 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1284,6 +1290,49 @@ void xprt_transmit(struct rpc_task *task)
req->rq_connect_cookie = connect_cookie; req->rq_connect_cookie = connect_cookie;
out_dequeue: out_dequeue:
xprt_request_dequeue_transmit(task); xprt_request_dequeue_transmit(task);
rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
return status;
}
/**
* xprt_transmit - send an RPC request on a transport
* @task: controlling RPC task
*
* Attempts to drain the transmit queue. On exit, either the transport
* signalled an error that needs to be handled before transmission can
* resume, or @task finished transmitting, and detected that it already
* received a reply.
*/
void
xprt_transmit(struct rpc_task *task)
{
struct rpc_rqst *next, *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
int status;
spin_lock(&xprt->queue_lock);
while (!list_empty(&xprt->xmit_queue)) {
next = list_first_entry(&xprt->xmit_queue,
struct rpc_rqst, rq_xmit);
xprt_pin_rqst(next);
spin_unlock(&xprt->queue_lock);
status = xprt_request_transmit(next, task);
if (status == -EBADMSG && next != req)
status = 0;
cond_resched();
spin_lock(&xprt->queue_lock);
xprt_unpin_rqst(next);
if (status == 0) {
if (!xprt_request_data_received(task) ||
test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
continue;
} else if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
rpc_wake_up_queued_task(&xprt->pending, task);
else
task->tk_status = status;
break;
}
spin_unlock(&xprt->queue_lock);
} }
static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment