Commit ca4faf54 authored by Chuck Lever's avatar Chuck Lever

SUNRPC: Move xpt_mutex into socket xpo_sendto methods

It appears that the RPC/RDMA transport does not need serialization
of calls to its xpo_sendto method. Move the mutex into the socket
methods that still need that serialization.

Tail latencies are unambiguously better with this patch applied.
fio randrw 8KB 70/30 on NFSv3, smaller numbers are better:

    clat percentiles (usec):

With xpt_mutex:
r    | 99.99th=[ 8848]
w    | 99.99th=[ 9634]

Without xpt_mutex:
r    | 99.99th=[ 8586]
w    | 99.99th=[ 8979]

Serializing the construction of RPC/RDMA transport headers is not
really necessary at this point, because the Linux NFS server
implementation never changes its credit grant on a connection. If
that should change, then svc_rdma_sendto will need to serialize
access to the transport's credit grant fields.
Reported-by: default avatarkbuild test robot <lkp@intel.com>
[ cel: fix uninitialized variable warning ]
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent b9bbe6ed
...@@ -117,6 +117,12 @@ static inline int register_xpt_user(struct svc_xprt *xpt, struct svc_xpt_user *u ...@@ -117,6 +117,12 @@ static inline int register_xpt_user(struct svc_xprt *xpt, struct svc_xpt_user *u
return 0; return 0;
} }
static inline bool svc_xprt_is_dead(const struct svc_xprt *xprt)
{
return (test_bit(XPT_DEAD, &xprt->xpt_flags) != 0) ||
(test_bit(XPT_CLOSE, &xprt->xpt_flags) != 0);
}
int svc_reg_xprt_class(struct svc_xprt_class *); int svc_reg_xprt_class(struct svc_xprt_class *);
void svc_unreg_xprt_class(struct svc_xprt_class *); void svc_unreg_xprt_class(struct svc_xprt_class *);
void svc_xprt_init(struct net *, struct svc_xprt_class *, struct svc_xprt *, void svc_xprt_init(struct net *, struct svc_xprt_class *, struct svc_xprt *,
......
...@@ -914,16 +914,10 @@ int svc_send(struct svc_rqst *rqstp) ...@@ -914,16 +914,10 @@ int svc_send(struct svc_rqst *rqstp)
xb->page_len + xb->page_len +
xb->tail[0].iov_len; xb->tail[0].iov_len;
trace_svc_sendto(xb); trace_svc_sendto(xb);
/* Grab mutex to serialize outgoing data. */
mutex_lock(&xprt->xpt_mutex);
trace_svc_stats_latency(rqstp); trace_svc_stats_latency(rqstp);
if (test_bit(XPT_DEAD, &xprt->xpt_flags)
|| test_bit(XPT_CLOSE, &xprt->xpt_flags)) len = xprt->xpt_ops->xpo_sendto(rqstp);
len = -ENOTCONN;
else
len = xprt->xpt_ops->xpo_sendto(rqstp);
mutex_unlock(&xprt->xpt_mutex);
trace_svc_send(rqstp, len); trace_svc_send(rqstp, len);
svc_xprt_release(rqstp); svc_xprt_release(rqstp);
......
...@@ -506,6 +506,9 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp) ...@@ -506,6 +506,9 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
* svc_udp_sendto - Send out a reply on a UDP socket * svc_udp_sendto - Send out a reply on a UDP socket
* @rqstp: completed svc_rqst * @rqstp: completed svc_rqst
* *
* xpt_mutex ensures @rqstp's whole message is written to the socket
* without interruption.
*
* Returns the number of bytes sent, or a negative errno. * Returns the number of bytes sent, or a negative errno.
*/ */
static int svc_udp_sendto(struct svc_rqst *rqstp) static int svc_udp_sendto(struct svc_rqst *rqstp)
...@@ -531,6 +534,11 @@ static int svc_udp_sendto(struct svc_rqst *rqstp) ...@@ -531,6 +534,11 @@ static int svc_udp_sendto(struct svc_rqst *rqstp)
svc_set_cmsg_data(rqstp, cmh); svc_set_cmsg_data(rqstp, cmh);
mutex_lock(&xprt->xpt_mutex);
if (svc_xprt_is_dead(xprt))
goto out_notconn;
err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent); err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
xdr_free_bvec(xdr); xdr_free_bvec(xdr);
if (err == -ECONNREFUSED) { if (err == -ECONNREFUSED) {
...@@ -538,9 +546,15 @@ static int svc_udp_sendto(struct svc_rqst *rqstp) ...@@ -538,9 +546,15 @@ static int svc_udp_sendto(struct svc_rqst *rqstp)
err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent); err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, 0, &sent);
xdr_free_bvec(xdr); xdr_free_bvec(xdr);
} }
mutex_unlock(&xprt->xpt_mutex);
if (err < 0) if (err < 0)
return err; return err;
return sent; return sent;
out_notconn:
mutex_unlock(&xprt->xpt_mutex);
return -ENOTCONN;
} }
static int svc_udp_has_wspace(struct svc_xprt *xprt) static int svc_udp_has_wspace(struct svc_xprt *xprt)
...@@ -1063,6 +1077,9 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp) ...@@ -1063,6 +1077,9 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
* svc_tcp_sendto - Send out a reply on a TCP socket * svc_tcp_sendto - Send out a reply on a TCP socket
* @rqstp: completed svc_rqst * @rqstp: completed svc_rqst
* *
* xpt_mutex ensures @rqstp's whole message is written to the socket
* without interruption.
*
* Returns the number of bytes sent, or a negative errno. * Returns the number of bytes sent, or a negative errno.
*/ */
static int svc_tcp_sendto(struct svc_rqst *rqstp) static int svc_tcp_sendto(struct svc_rqst *rqstp)
...@@ -1080,12 +1097,19 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp) ...@@ -1080,12 +1097,19 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
svc_release_skb(rqstp); svc_release_skb(rqstp);
mutex_lock(&xprt->xpt_mutex);
if (svc_xprt_is_dead(xprt))
goto out_notconn;
err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, marker, &sent); err = xprt_sock_sendmsg(svsk->sk_sock, &msg, xdr, 0, marker, &sent);
xdr_free_bvec(xdr); xdr_free_bvec(xdr);
if (err < 0 || sent != (xdr->len + sizeof(marker))) if (err < 0 || sent != (xdr->len + sizeof(marker)))
goto out_close; goto out_close;
mutex_unlock(&xprt->xpt_mutex);
return sent; return sent;
out_notconn:
mutex_unlock(&xprt->xpt_mutex);
return -ENOTCONN;
out_close: out_close:
pr_notice("rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n", pr_notice("rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n",
xprt->xpt_server->sv_name, xprt->xpt_server->sv_name,
...@@ -1093,6 +1117,7 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp) ...@@ -1093,6 +1117,7 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp)
(err < 0) ? err : sent, xdr->len); (err < 0) ? err : sent, xdr->len);
set_bit(XPT_CLOSE, &xprt->xpt_flags); set_bit(XPT_CLOSE, &xprt->xpt_flags);
svc_xprt_enqueue(xprt); svc_xprt_enqueue(xprt);
mutex_unlock(&xprt->xpt_mutex);
return -EAGAIN; return -EAGAIN;
} }
......
...@@ -210,34 +210,31 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) ...@@ -210,34 +210,31 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
return -ENOTCONN; return -ENOTCONN;
} }
/* Send an RPC call on the passive end of a transport /**
* connection. * xprt_rdma_bc_send_request - Send a reverse-direction Call
* @rqst: rpc_rqst containing Call message to be sent
*
* Return values:
* %0 if the message was sent successfully
* %ENOTCONN if the message was not sent
*/ */
static int static int xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
{ {
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt; struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
struct svcxprt_rdma *rdma; struct svcxprt_rdma *rdma =
container_of(sxprt, struct svcxprt_rdma, sc_xprt);
int ret; int ret;
dprintk("svcrdma: sending bc call with xid: %08x\n", dprintk("svcrdma: sending bc call with xid: %08x\n",
be32_to_cpu(rqst->rq_xid)); be32_to_cpu(rqst->rq_xid));
mutex_lock(&sxprt->xpt_mutex); if (test_bit(XPT_DEAD, &sxprt->xpt_flags))
return -ENOTCONN;
ret = -ENOTCONN;
rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) {
ret = rpcrdma_bc_send_request(rdma, rqst);
if (ret == -ENOTCONN)
svc_close_xprt(sxprt);
}
mutex_unlock(&sxprt->xpt_mutex); ret = rpcrdma_bc_send_request(rdma, rqst);
if (ret == -ENOTCONN)
if (ret < 0) svc_close_xprt(sxprt);
return ret; return ret;
return 0;
} }
static void static void
......
...@@ -868,12 +868,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -868,12 +868,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
__be32 *p; __be32 *p;
int ret; int ret;
/* Create the RDMA response header. xprt->xpt_mutex, ret = -ENOTCONN;
* acquired in svc_send(), serializes RPC replies. The if (svc_xprt_is_dead(xprt))
* code path below that inserts the credit grant value goto err0;
* into each transport header runs only inside this
* critical section.
*/
ret = -ENOMEM; ret = -ENOMEM;
sctxt = svc_rdma_send_ctxt_get(rdma); sctxt = svc_rdma_send_ctxt_get(rdma);
if (!sctxt) if (!sctxt)
......
...@@ -2548,8 +2548,16 @@ static int bc_sendto(struct rpc_rqst *req) ...@@ -2548,8 +2548,16 @@ static int bc_sendto(struct rpc_rqst *req)
return sent; return sent;
} }
/* /**
* The send routine. Borrows from svc_send * bc_send_request - Send a backchannel Call on a TCP socket
* @req: rpc_rqst containing Call message to be sent
*
* xpt_mutex ensures @rqstp's whole message is written to the socket
* without interruption.
*
* Return values:
* %0 if the message was sent successfully
* %ENOTCONN if the message was not sent
*/ */
static int bc_send_request(struct rpc_rqst *req) static int bc_send_request(struct rpc_rqst *req)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment