Commit 6fd5034d authored by Chuck Lever's avatar Chuck Lever

svcrdma: Refactor chunk list encoders

Same idea as the receive-side changes I did a while back: use
xdr_stream helpers rather than open-coding the XDR chunk list
encoders. This builds the Reply transport header from beginning to
end without backtracking.

As additional clean-ups, fill in documenting comments for the XDR
encoders and sprinkle some trace points in the new encoding
functions.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 5c266df5
...@@ -149,6 +149,8 @@ struct svc_rdma_send_ctxt { ...@@ -149,6 +149,8 @@ struct svc_rdma_send_ctxt {
struct list_head sc_list; struct list_head sc_list;
struct ib_send_wr sc_send_wr; struct ib_send_wr sc_send_wr;
struct ib_cqe sc_cqe; struct ib_cqe sc_cqe;
struct xdr_buf sc_hdrbuf;
struct xdr_stream sc_stream;
void *sc_xprt_buf; void *sc_xprt_buf;
int sc_page_count; int sc_page_count;
int sc_cur_sge_no; int sc_cur_sge_no;
......
...@@ -181,7 +181,9 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) ...@@ -181,7 +181,9 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
if (!ctxt) if (!ctxt)
goto drop_connection; goto drop_connection;
p = ctxt->sc_xprt_buf; p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_MIN);
if (!p)
goto put_ctxt;
*p++ = rqst->rq_xid; *p++ = rqst->rq_xid;
*p++ = rpcrdma_version; *p++ = rpcrdma_version;
*p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests); *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
...@@ -189,7 +191,7 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) ...@@ -189,7 +191,7 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
*p++ = xdr_zero; *p++ = xdr_zero;
*p++ = xdr_zero; *p++ = xdr_zero;
*p = xdr_zero; *p = xdr_zero;
svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_MIN); svc_rdma_sync_reply_hdr(rdma, ctxt, ctxt->sc_hdrbuf.len);
#ifdef SVCRDMA_BACKCHANNEL_DEBUG #ifdef SVCRDMA_BACKCHANNEL_DEBUG
pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer); pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
...@@ -197,12 +199,13 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) ...@@ -197,12 +199,13 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
rqst->rq_xtime = ktime_get(); rqst->rq_xtime = ktime_get();
rc = svc_rdma_bc_sendto(rdma, rqst, ctxt); rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
if (rc) { if (rc)
svc_rdma_send_ctxt_put(rdma, ctxt); goto put_ctxt;
goto drop_connection;
}
return 0; return 0;
put_ctxt:
svc_rdma_send_ctxt_put(rdma, ctxt);
drop_connection: drop_connection:
dprintk("svcrdma: failed to send bc call\n"); dprintk("svcrdma: failed to send bc call\n");
return -ENOTCONN; return -ENOTCONN;
......
...@@ -698,7 +698,6 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt, ...@@ -698,7 +698,6 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
__be32 *rdma_argp, int status) __be32 *rdma_argp, int status)
{ {
struct svc_rdma_send_ctxt *ctxt; struct svc_rdma_send_ctxt *ctxt;
unsigned int length;
__be32 *p; __be32 *p;
int ret; int ret;
...@@ -706,29 +705,46 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt, ...@@ -706,29 +705,46 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
if (!ctxt) if (!ctxt)
return; return;
p = ctxt->sc_xprt_buf; p = xdr_reserve_space(&ctxt->sc_stream,
rpcrdma_fixed_maxsz * sizeof(*p));
if (!p)
goto put_ctxt;
*p++ = *rdma_argp; *p++ = *rdma_argp;
*p++ = *(rdma_argp + 1); *p++ = *(rdma_argp + 1);
*p++ = xprt->sc_fc_credits; *p++ = xprt->sc_fc_credits;
*p++ = rdma_error; *p = rdma_error;
switch (status) { switch (status) {
case -EPROTONOSUPPORT: case -EPROTONOSUPPORT:
p = xdr_reserve_space(&ctxt->sc_stream, 3 * sizeof(*p));
if (!p)
goto put_ctxt;
*p++ = err_vers; *p++ = err_vers;
*p++ = rpcrdma_version; *p++ = rpcrdma_version;
*p++ = rpcrdma_version; *p = rpcrdma_version;
trace_svcrdma_err_vers(*rdma_argp); trace_svcrdma_err_vers(*rdma_argp);
break; break;
default: default:
*p++ = err_chunk; p = xdr_reserve_space(&ctxt->sc_stream, sizeof(*p));
if (!p)
goto put_ctxt;
*p = err_chunk;
trace_svcrdma_err_chunk(*rdma_argp); trace_svcrdma_err_chunk(*rdma_argp);
} }
length = (unsigned long)p - (unsigned long)ctxt->sc_xprt_buf;
svc_rdma_sync_reply_hdr(xprt, ctxt, length); svc_rdma_sync_reply_hdr(xprt, ctxt, ctxt->sc_hdrbuf.len);
ctxt->sc_send_wr.opcode = IB_WR_SEND; ctxt->sc_send_wr.opcode = IB_WR_SEND;
ret = svc_rdma_send(xprt, &ctxt->sc_send_wr); ret = svc_rdma_send(xprt, &ctxt->sc_send_wr);
if (ret) if (ret)
svc_rdma_send_ctxt_put(xprt, ctxt); goto put_ctxt;
return;
put_ctxt:
svc_rdma_send_ctxt_put(xprt, ctxt);
} }
/* By convention, backchannel calls arrive via rdma_msg type /* By convention, backchannel calls arrive via rdma_msg type
......
...@@ -151,6 +151,8 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) ...@@ -151,6 +151,8 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
ctxt->sc_cqe.done = svc_rdma_wc_send; ctxt->sc_cqe.done = svc_rdma_wc_send;
ctxt->sc_xprt_buf = buffer; ctxt->sc_xprt_buf = buffer;
xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
rdma->sc_max_req_size);
ctxt->sc_sges[0].addr = addr; ctxt->sc_sges[0].addr = addr;
for (i = 0; i < rdma->sc_max_send_sges; i++) for (i = 0; i < rdma->sc_max_send_sges; i++)
...@@ -204,6 +206,10 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) ...@@ -204,6 +206,10 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
spin_unlock(&rdma->sc_send_lock); spin_unlock(&rdma->sc_send_lock);
out: out:
rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
ctxt->sc_xprt_buf, NULL);
ctxt->sc_send_wr.num_sge = 0; ctxt->sc_send_wr.num_sge = 0;
ctxt->sc_cur_sge_no = 0; ctxt->sc_cur_sge_no = 0;
ctxt->sc_page_count = 0; ctxt->sc_page_count = 0;
...@@ -322,131 +328,173 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr) ...@@ -322,131 +328,173 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
return ret; return ret;
} }
/* Returns length of transport header, in bytes. /**
* svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
* @sctxt: Send context for the RPC Reply
*
* Return values:
* On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Reply Read list
* %-EMSGSIZE on XDR buffer overflow
*/ */
static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp) static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
{ {
unsigned int nsegs; /* RPC-over-RDMA version 1 replies never have a Read list. */
__be32 *p; return xdr_stream_encode_item_absent(&sctxt->sc_stream);
}
p = rdma_resp;
/* RPC-over-RDMA V1 replies never have a Read list. */
p += rpcrdma_fixed_maxsz + 1;
/* Skip Write list. */
while (*p++ != xdr_zero) {
nsegs = be32_to_cpup(p++);
p += nsegs * rpcrdma_segment_maxsz;
}
/* Skip Reply chunk. */ /**
if (*p++ != xdr_zero) { * svc_rdma_encode_write_segment - Encode one Write segment
nsegs = be32_to_cpup(p++); * @src: matching Write chunk in the RPC Call header
p += nsegs * rpcrdma_segment_maxsz; * @sctxt: Send context for the RPC Reply
* @remaining: remaining bytes of the payload left in the Write chunk
*
* Return values:
* On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Write segment
* %-EMSGSIZE on XDR buffer overflow
*/
static ssize_t svc_rdma_encode_write_segment(__be32 *src,
struct svc_rdma_send_ctxt *sctxt,
unsigned int *remaining)
{
__be32 *p;
const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
u32 handle, length;
u64 offset;
p = xdr_reserve_space(&sctxt->sc_stream, len);
if (!p)
return -EMSGSIZE;
handle = be32_to_cpup(src++);
length = be32_to_cpup(src++);
xdr_decode_hyper(src, &offset);
*p++ = cpu_to_be32(handle);
if (*remaining < length) {
/* segment only partly filled */
length = *remaining;
*remaining = 0;
} else {
/* entire segment was consumed */
*remaining -= length;
} }
*p++ = cpu_to_be32(length);
xdr_encode_hyper(p, offset);
return (unsigned long)p - (unsigned long)rdma_resp; trace_svcrdma_encode_wseg(handle, length, offset);
return len;
} }
/* One Write chunk is copied from Call transport header to Reply /**
* transport header. Each segment's length field is updated to * svc_rdma_encode_write_chunk - Encode one Write chunk
* reflect number of bytes consumed in the segment. * @src: matching Write chunk in the RPC Call header
* * @sctxt: Send context for the RPC Reply
* Returns number of segments in this chunk. * @remaining: size in bytes of the payload in the Write chunk
*
* Copy a Write chunk from the Call transport header to the
* Reply transport header. Update each segment's length field
* to reflect the number of bytes written in that segment.
*
* Return values:
* On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Write chunk
* %-EMSGSIZE on XDR buffer overflow
*/ */
static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
struct svc_rdma_send_ctxt *sctxt,
unsigned int remaining) unsigned int remaining)
{ {
unsigned int i, nsegs; unsigned int i, nsegs;
u32 seg_len; ssize_t len, ret;
/* Write list discriminator */ len = 0;
*dst++ = *src++; trace_svcrdma_encode_write_chunk(remaining);
/* number of segments in this chunk */ src++;
nsegs = be32_to_cpup(src); ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
*dst++ = *src++; if (ret < 0)
return -EMSGSIZE;
len += ret;
for (i = nsegs; i; i--) { nsegs = be32_to_cpup(src++);
/* segment's RDMA handle */ ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
*dst++ = *src++; if (ret < 0)
return -EMSGSIZE;
/* bytes returned in this segment */ len += ret;
seg_len = be32_to_cpu(*src);
if (remaining >= seg_len) {
/* entire segment was consumed */
*dst = *src;
remaining -= seg_len;
} else {
/* segment only partly filled */
*dst = cpu_to_be32(remaining);
remaining = 0;
}
dst++; src++;
/* segment's RDMA offset */ for (i = nsegs; i; i--) {
*dst++ = *src++; ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
*dst++ = *src++; if (ret < 0)
return -EMSGSIZE;
src += rpcrdma_segment_maxsz;
len += ret;
} }
return nsegs; return len;
} }
/* The client provided a Write list in the Call message. Fill in /**
* the segments in the first Write chunk in the Reply's transport * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
* @rctxt: Reply context with information about the RPC Call
* @sctxt: Send context for the RPC Reply
* @length: size in bytes of the payload in the first Write chunk
*
* The client provides a Write chunk list in the Call message. Fill
* in the segments in the first Write chunk in the Reply's transport
* header with the number of bytes consumed in each segment. * header with the number of bytes consumed in each segment.
* Remaining chunks are returned unused. * Remaining chunks are returned unused.
* *
* Assumptions: * Assumptions:
* - Client has provided only one Write chunk * - Client has provided only one Write chunk
*
* Return values:
* On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Reply's Write list
* %-EMSGSIZE on XDR buffer overflow
*/ */
static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, static ssize_t
unsigned int consumed) svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
struct svc_rdma_send_ctxt *sctxt,
unsigned int length)
{ {
unsigned int nsegs; ssize_t len, ret;
__be32 *p, *q;
/* RPC-over-RDMA V1 replies never have a Read list. */
p = rdma_resp + rpcrdma_fixed_maxsz + 1;
q = wr_ch;
while (*q != xdr_zero) {
nsegs = xdr_encode_write_chunk(p, q, consumed);
q += 2 + nsegs * rpcrdma_segment_maxsz;
p += 2 + nsegs * rpcrdma_segment_maxsz;
consumed = 0;
}
/* Terminate Write list */ ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
*p++ = xdr_zero; if (ret < 0)
return ret;
len = ret;
/* Reply chunk discriminator; may be replaced later */ /* Terminate the Write list */
*p = xdr_zero; ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
if (ret < 0)
return ret;
return len + ret;
} }
/* The client provided a Reply chunk in the Call message. Fill in /**
* the segments in the Reply chunk in the Reply message with the * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
* number of bytes consumed in each segment. * @rctxt: Reply context with information about the RPC Call
* @sctxt: Send context for the RPC Reply
* @length: size in bytes of the payload in the Reply chunk
* *
* Assumptions: * Assumptions:
* - Reply can always fit in the provided Reply chunk * - Reply can always fit in the client-provided Reply chunk
*
* Return values:
* On success, returns length in bytes of the Reply XDR buffer
* that was consumed by the Reply's Reply chunk
* %-EMSGSIZE on XDR buffer overflow
*/ */
static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, static ssize_t
unsigned int consumed) svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
struct svc_rdma_send_ctxt *sctxt,
unsigned int length)
{ {
__be32 *p; return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
length);
/* Find the Reply chunk in the Reply's xprt header.
* RPC-over-RDMA V1 replies never have a Read list.
*/
p = rdma_resp + rpcrdma_fixed_maxsz + 1;
/* Skip past Write list */
while (*p++ != xdr_zero)
p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
xdr_encode_write_chunk(p, rp_ch, consumed);
} }
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
...@@ -765,14 +813,26 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, ...@@ -765,14 +813,26 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt, struct svc_rdma_send_ctxt *ctxt,
struct svc_rqst *rqstp) struct svc_rqst *rqstp)
{ {
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
__be32 *rdma_argp = rctxt->rc_recv_buf;
__be32 *p; __be32 *p;
p = ctxt->sc_xprt_buf; rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
trace_svcrdma_err_chunk(*p); xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
p += 3; NULL);
p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_ERR);
if (!p)
return -ENOMSG;
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
*p++ = rdma->sc_fc_credits;
*p++ = rdma_error; *p++ = rdma_error;
*p = err_chunk; *p = err_chunk;
svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR); trace_svcrdma_err_chunk(*rdma_argp);
svc_rdma_sync_reply_hdr(rdma, ctxt, ctxt->sc_hdrbuf.len);
svc_rdma_save_io_pages(rqstp, ctxt); svc_rdma_save_io_pages(rqstp, ctxt);
...@@ -803,7 +863,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -803,7 +863,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
__be32 *rp_ch = rctxt->rc_reply_chunk; __be32 *rp_ch = rctxt->rc_reply_chunk;
struct xdr_buf *xdr = &rqstp->rq_res; struct xdr_buf *xdr = &rqstp->rq_res;
struct svc_rdma_send_ctxt *sctxt; struct svc_rdma_send_ctxt *sctxt;
__be32 *p, *rdma_resp; __be32 *p;
int ret; int ret;
/* Create the RDMA response header. xprt->xpt_mutex, /* Create the RDMA response header. xprt->xpt_mutex,
...@@ -816,19 +876,18 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -816,19 +876,18 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
sctxt = svc_rdma_send_ctxt_get(rdma); sctxt = svc_rdma_send_ctxt_get(rdma);
if (!sctxt) if (!sctxt)
goto err0; goto err0;
rdma_resp = sctxt->sc_xprt_buf;
p = rdma_resp; p = xdr_reserve_space(&sctxt->sc_stream,
rpcrdma_fixed_maxsz * sizeof(*p));
if (!p)
goto err0;
*p++ = *rdma_argp; *p++ = *rdma_argp;
*p++ = *(rdma_argp + 1); *p++ = *(rdma_argp + 1);
*p++ = rdma->sc_fc_credits; *p++ = rdma->sc_fc_credits;
*p++ = rp_ch ? rdma_nomsg : rdma_msg; *p = rp_ch ? rdma_nomsg : rdma_msg;
/* Start with empty chunks */
*p++ = xdr_zero;
*p++ = xdr_zero;
*p = xdr_zero;
if (svc_rdma_encode_read_list(sctxt) < 0)
goto err0;
if (wr_lst) { if (wr_lst) {
/* XXX: Presume the client sent only one Write chunk */ /* XXX: Presume the client sent only one Write chunk */
unsigned long offset; unsigned long offset;
...@@ -845,16 +904,24 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -845,16 +904,24 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
length); length);
if (ret < 0) if (ret < 0)
goto err2; goto err2;
svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
goto err0;
} else {
if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
goto err0;
} }
if (rp_ch) { if (rp_ch) {
ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res); ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
if (ret < 0) if (ret < 0)
goto err2; goto err2;
svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
goto err0;
} else {
if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
goto err0;
} }
svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp)); svc_rdma_sync_reply_hdr(rdma, sctxt, sctxt->sc_hdrbuf.len);
ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp); ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
if (ret < 0) if (ret < 0)
goto err1; goto err1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment