Commit 7a1cbfa1 authored by Chuck Lever's avatar Chuck Lever

svcrdma: Use parsed chunk lists to construct RDMA Writes

Refactor: Instead of re-parsing the ingress RPC Call transport
header when constructing RDMA Writes, use the new parsed chunk lists
for the Write list and Reply chunk, which are version-agnostic and
already XDR-decoded.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 58b2e0fe
...@@ -157,8 +157,6 @@ struct svc_rdma_recv_ctxt { ...@@ -157,8 +157,6 @@ struct svc_rdma_recv_ctxt {
__be32 *rc_reply_chunk; __be32 *rc_reply_chunk;
struct svc_rdma_pcl rc_reply_pcl; struct svc_rdma_pcl rc_reply_pcl;
unsigned int rc_read_payload_offset;
unsigned int rc_read_payload_length;
struct page *rc_pages[RPCSVC_MAXPAGES]; struct page *rc_pages[RPCSVC_MAXPAGES];
}; };
...@@ -196,7 +194,8 @@ extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, ...@@ -196,7 +194,8 @@ extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp, struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head, __be32 *p); struct svc_rdma_recv_ctxt *head, __be32 *p);
extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
__be32 *wr_ch, const struct xdr_buf *xdr); const struct svc_rdma_chunk *chunk,
const struct xdr_buf *xdr);
extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt, const struct svc_rdma_recv_ctxt *rctxt,
struct xdr_buf *xdr); struct xdr_buf *xdr);
......
...@@ -207,7 +207,6 @@ svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) ...@@ -207,7 +207,6 @@ svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
out: out:
ctxt->rc_page_count = 0; ctxt->rc_page_count = 0;
ctxt->rc_read_payload_length = 0;
return ctxt; return ctxt;
out_empty: out_empty:
......
...@@ -190,11 +190,11 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, ...@@ -190,11 +190,11 @@ static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
* - Stores arguments for the SGL constructor functions * - Stores arguments for the SGL constructor functions
*/ */
struct svc_rdma_write_info { struct svc_rdma_write_info {
const struct svc_rdma_chunk *wi_chunk;
/* write state of this chunk */ /* write state of this chunk */
unsigned int wi_seg_off; unsigned int wi_seg_off;
unsigned int wi_seg_no; unsigned int wi_seg_no;
unsigned int wi_nsegs;
__be32 *wi_segs;
/* SGL constructor arguments */ /* SGL constructor arguments */
const struct xdr_buf *wi_xdr; const struct xdr_buf *wi_xdr;
...@@ -205,7 +205,8 @@ struct svc_rdma_write_info { ...@@ -205,7 +205,8 @@ struct svc_rdma_write_info {
}; };
static struct svc_rdma_write_info * static struct svc_rdma_write_info *
svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk) svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
const struct svc_rdma_chunk *chunk)
{ {
struct svc_rdma_write_info *info; struct svc_rdma_write_info *info;
...@@ -213,10 +214,9 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk) ...@@ -213,10 +214,9 @@ svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
if (!info) if (!info)
return info; return info;
info->wi_chunk = chunk;
info->wi_seg_off = 0; info->wi_seg_off = 0;
info->wi_seg_no = 0; info->wi_seg_no = 0;
info->wi_nsegs = be32_to_cpup(++chunk);
info->wi_segs = ++chunk;
svc_rdma_cc_init(rdma, &info->wi_cc); svc_rdma_cc_init(rdma, &info->wi_cc);
info->wi_cc.cc_cqe.done = svc_rdma_write_done; info->wi_cc.cc_cqe.done = svc_rdma_write_done;
return info; return info;
...@@ -443,23 +443,19 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info, ...@@ -443,23 +443,19 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
{ {
struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
struct svcxprt_rdma *rdma = cc->cc_rdma; struct svcxprt_rdma *rdma = cc->cc_rdma;
const struct svc_rdma_segment *seg;
struct svc_rdma_rw_ctxt *ctxt; struct svc_rdma_rw_ctxt *ctxt;
__be32 *seg;
int ret; int ret;
seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
do { do {
unsigned int write_len; unsigned int write_len;
u32 handle, length;
u64 offset; u64 offset;
if (info->wi_seg_no >= info->wi_nsegs) seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
if (!seg)
goto out_overflow; goto out_overflow;
xdr_decode_rdma_segment(seg, &handle, &length, &offset); write_len = min(remaining, seg->rs_length - info->wi_seg_off);
offset += info->wi_seg_off;
write_len = min(remaining, length - info->wi_seg_off);
if (!write_len) if (!write_len)
goto out_overflow; goto out_overflow;
ctxt = svc_rdma_get_rw_ctxt(rdma, ctxt = svc_rdma_get_rw_ctxt(rdma,
...@@ -468,17 +464,17 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info, ...@@ -468,17 +464,17 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
return -ENOMEM; return -ENOMEM;
constructor(info, write_len, ctxt); constructor(info, write_len, ctxt);
ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, handle, offset = seg->rs_offset + info->wi_seg_off;
ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
DMA_TO_DEVICE); DMA_TO_DEVICE);
if (ret < 0) if (ret < 0)
return -EIO; return -EIO;
trace_svcrdma_send_wseg(handle, write_len, offset); trace_svcrdma_send_wseg(seg->rs_handle, write_len, offset);
list_add(&ctxt->rw_list, &cc->cc_rwctxts); list_add(&ctxt->rw_list, &cc->cc_rwctxts);
cc->cc_sqecount += ret; cc->cc_sqecount += ret;
if (write_len == length - info->wi_seg_off) { if (write_len == seg->rs_length - info->wi_seg_off) {
seg += 4;
info->wi_seg_no++; info->wi_seg_no++;
info->wi_seg_off = 0; info->wi_seg_off = 0;
} else { } else {
...@@ -491,7 +487,7 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info, ...@@ -491,7 +487,7 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info,
out_overflow: out_overflow:
trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no, trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no,
info->wi_nsegs); info->wi_chunk->ch_segcount);
return -E2BIG; return -E2BIG;
} }
...@@ -579,7 +575,7 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr, ...@@ -579,7 +575,7 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr,
/** /**
* svc_rdma_send_write_chunk - Write all segments in a Write chunk * svc_rdma_send_write_chunk - Write all segments in a Write chunk
* @rdma: controlling RDMA transport * @rdma: controlling RDMA transport
* @wr_ch: Write chunk provided by client * @chunk: Write chunk provided by the client
* @xdr: xdr_buf containing the data payload * @xdr: xdr_buf containing the data payload
* *
* Returns a non-negative number of bytes the chunk consumed, or * Returns a non-negative number of bytes the chunk consumed, or
...@@ -589,13 +585,14 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr, ...@@ -589,13 +585,14 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr,
* %-ENOTCONN if posting failed (connection is lost), * %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc). * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/ */
int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_chunk *chunk,
const struct xdr_buf *xdr) const struct xdr_buf *xdr)
{ {
struct svc_rdma_write_info *info; struct svc_rdma_write_info *info;
int ret; int ret;
info = svc_rdma_write_info_alloc(rdma, wr_ch); info = svc_rdma_write_info_alloc(rdma, chunk);
if (!info) if (!info)
return -ENOMEM; return -ENOMEM;
...@@ -633,12 +630,14 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, ...@@ -633,12 +630,14 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
struct xdr_buf *xdr) struct xdr_buf *xdr)
{ {
struct svc_rdma_write_info *info; struct svc_rdma_write_info *info;
struct svc_rdma_chunk *chunk;
int consumed, ret; int consumed, ret;
if (!rctxt->rc_reply_chunk) if (pcl_is_empty(&rctxt->rc_reply_pcl))
return 0; return 0;
info = svc_rdma_write_info_alloc(rdma, rctxt->rc_reply_chunk); chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
info = svc_rdma_write_info_alloc(rdma, chunk);
if (!info) if (!info)
return -ENOMEM; return -ENOMEM;
...@@ -650,7 +649,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, ...@@ -650,7 +649,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
/* Send the page list in the Reply chunk only if the /* Send the page list in the Reply chunk only if the
* client did not provide Write chunks. * client did not provide Write chunks.
*/ */
if (!rctxt->rc_write_list && xdr->page_len) { if (pcl_is_empty(&rctxt->rc_write_pcl) && xdr->page_len) {
ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
xdr->page_len); xdr->page_len);
if (ret < 0) if (ret < 0)
......
...@@ -466,12 +466,14 @@ static ssize_t ...@@ -466,12 +466,14 @@ static ssize_t
svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt, svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
struct svc_rdma_send_ctxt *sctxt) struct svc_rdma_send_ctxt *sctxt)
{ {
struct svc_rdma_chunk *chunk;
ssize_t len, ret; ssize_t len, ret;
len = 0; len = 0;
if (rctxt->rc_write_list) { if (rctxt->rc_write_list) {
chunk = pcl_first_chunk(&rctxt->rc_write_pcl);
ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
rctxt->rc_read_payload_length); chunk->ch_payload_length);
if (ret < 0) if (ret < 0)
return ret; return ret;
len = ret; len = ret;
...@@ -978,25 +980,27 @@ int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset, ...@@ -978,25 +980,27 @@ int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length) unsigned int length)
{ {
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
struct svc_rdma_chunk *chunk;
struct svcxprt_rdma *rdma; struct svcxprt_rdma *rdma;
struct xdr_buf subbuf; struct xdr_buf subbuf;
int ret; int ret;
if (!rctxt->rc_write_list || !length) chunk = rctxt->rc_cur_result_payload;
if (!length || !chunk)
return 0; return 0;
rctxt->rc_cur_result_payload =
pcl_next_chunk(&rctxt->rc_write_pcl, chunk);
if (length > chunk->ch_length)
return -E2BIG;
/* XXX: Just one READ payload slot for now, since our chunk->ch_position = offset;
* transport implementation currently supports only one chunk->ch_payload_length = length;
* Write chunk.
*/
rctxt->rc_read_payload_offset = offset;
rctxt->rc_read_payload_length = length;
if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length)) if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length))
return -EMSGSIZE; return -EMSGSIZE;
rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt); rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
ret = svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list, &subbuf); ret = svc_rdma_send_write_chunk(rdma, chunk, &subbuf);
if (ret < 0) if (ret < 0)
return ret; return ret;
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment