Commit f6ad7759 authored by Chuck Lever's avatar Chuck Lever

svcrdma: Post RDMA Writes while XDR encoding replies

The only RPC/RDMA ordering requirement between RDMA Writes and RDMA
Sends is that the responder must post the Writes on the Send queue
before posting the Send that conveys the RPC Reply for that Write
payload.

The Linux NFS server implementation now has a transport method that
can post result Payload Writes earlier than svc_rdma_sendto:

   ->xpo_result_payload()

This gets RDMA Writes going earlier so they are more likely to be
complete at the remote end before the Send completes.

Some care must be taken with pulled-up Replies. We don't want to
push the Write chunk and then send the same payload data via Send.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 76e5492b
......@@ -183,9 +183,7 @@ extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head, __be32 *p);
extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
__be32 *wr_ch, struct xdr_buf *xdr,
unsigned int offset,
unsigned long length);
__be32 *wr_ch, const struct xdr_buf *xdr);
extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
struct xdr_buf *xdr);
......
......@@ -538,13 +538,49 @@ static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
length);
}
/**
* svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
* @xdr: xdr_buf to write
* @info: pointer to write arguments
*
* Returns:
* On succes, returns zero
* %-E2BIG if the client-provided Write chunk is too small
* %-ENOMEM if a resource has been exhausted
* %-EIO if an rdma-rw error occurred
*/
static int svc_rdma_xb_write(const struct xdr_buf *xdr,
struct svc_rdma_write_info *info)
{
int ret;
if (xdr->head[0].iov_len) {
ret = svc_rdma_iov_write(info, &xdr->head[0]);
if (ret < 0)
return ret;
}
if (xdr->page_len) {
ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
xdr->page_len);
if (ret < 0)
return ret;
}
if (xdr->tail[0].iov_len) {
ret = svc_rdma_iov_write(info, &xdr->tail[0]);
if (ret < 0)
return ret;
}
return xdr->len;
}
/**
* svc_rdma_send_write_chunk - Write all segments in a Write chunk
* @rdma: controlling RDMA transport
* @wr_ch: Write chunk provided by client
* @xdr: xdr_buf containing the data payload
* @offset: payload's byte offset in @xdr
* @length: size of payload, in bytes
*
* Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Write chunk,
......@@ -554,21 +590,17 @@ static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/
int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
struct xdr_buf *xdr,
unsigned int offset, unsigned long length)
const struct xdr_buf *xdr)
{
struct svc_rdma_write_info *info;
int ret;
if (!length)
return 0;
info = svc_rdma_write_info_alloc(rdma, wr_ch);
if (!info)
return -ENOMEM;
ret = svc_rdma_pages_write(info, xdr, offset, length);
if (ret < 0)
ret = svc_rdma_xb_write(xdr, info);
if (ret != xdr->len)
goto out_err;
ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
......@@ -576,7 +608,7 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
goto out_err;
trace_svcrdma_send_write_chunk(xdr->page_len);
return length;
return xdr->len;
out_err:
svc_rdma_write_info_free(info);
......
......@@ -468,11 +468,14 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
{
ssize_t len, ret;
ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
rctxt->rc_read_payload_length);
if (ret < 0)
return ret;
len = ret;
len = 0;
if (rctxt->rc_write_list) {
ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
rctxt->rc_read_payload_length);
if (ret < 0)
return ret;
len = ret;
}
/* Terminate the Write list */
ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
......@@ -556,11 +559,13 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
struct xdr_buf *xdr)
{
bool write_chunk_present = rctxt && rctxt->rc_write_list;
int elements;
/* For small messages, copying bytes is cheaper than DMA mapping.
*/
if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
if (!write_chunk_present &&
sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
return true;
/* Check whether the xdr_buf has more elements than can
......@@ -893,9 +898,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
container_of(xprt, struct svcxprt_rdma, sc_xprt);
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
__be32 *rdma_argp = rctxt->rc_recv_buf;
__be32 *wr_lst = rctxt->rc_write_list;
__be32 *rp_ch = rctxt->rc_reply_chunk;
struct xdr_buf *xdr = &rqstp->rq_res;
struct svc_rdma_send_ctxt *sctxt;
__be32 *p;
int ret;
......@@ -920,19 +923,8 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (svc_rdma_encode_read_list(sctxt) < 0)
goto err0;
if (wr_lst) {
/* XXX: Presume the client sent only one Write chunk */
ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr,
rctxt->rc_read_payload_offset,
rctxt->rc_read_payload_length);
if (ret < 0)
goto err2;
if (svc_rdma_encode_write_list(rctxt, sctxt) < 0)
goto err0;
} else {
if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
goto err0;
}
if (svc_rdma_encode_write_list(rctxt, sctxt) < 0)
goto err0;
if (rp_ch) {
ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
if (ret < 0)
......@@ -974,16 +966,25 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
* @offset: payload's byte offset in @xdr
* @length: size of payload, in bytes
*
* Returns zero on success.
*
* For the moment, just record the xdr_buf location of the result
* payload. svc_rdma_sendto will use that location later when
* we actually send the payload.
* Return values:
* %0 if successful or nothing needed to be done
* %-EMSGSIZE on XDR buffer overflow
* %-E2BIG if the payload was larger than the Write chunk
* %-EINVAL if client provided too many segments
* %-ENOMEM if rdma_rw context pool was exhausted
* %-ENOTCONN if posting failed (connection is lost)
* %-EIO if rdma_rw initialization failed (DMA mapping, etc)
*/
int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length)
{
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
struct svcxprt_rdma *rdma;
struct xdr_buf subbuf;
int ret;
if (!rctxt->rc_write_list || !length)
return 0;
/* XXX: Just one READ payload slot for now, since our
* transport implementation currently supports only one
......@@ -992,5 +993,12 @@ int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
rctxt->rc_read_payload_offset = offset;
rctxt->rc_read_payload_length = length;
if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length))
return -EMSGSIZE;
rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
ret = svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list, &subbuf);
if (ret < 0)
return ret;
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment