Commit 655fec69 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Use gathered Send for large inline messages

An RPC Call message that is sent inline but that has a data payload
(ie, one or more items in rq_snd_buf's page list) must be "pulled
up:"

- call_allocate has to reserve enough RPC Call buffer space to
accommodate the data payload

- call_transmit has to memcopy the rq_snd_buf's page list and tail
into its head iovec before it is sent

As the inline threshold is increased beyond its current 1KB default,
however, this means data payloads of more than a few KB are copied
by the host CPU. For example, if the inline threshold is increased
just to 4KB, then NFS WRITE requests up to 4KB would involve a
memcpy of the NFS WRITE's payload data into the RPC Call buffer.
This is an undesirable amount of participation by the host CPU.

The inline threshold may be much larger than 4KB in the future,
after negotiation with a peer server.

Instead of copying the components of rq_snd_buf into its head iovec,
construct a gather list of these components, and send them all in
place. The same approach is already used in the Linux server's
RPC-over-RDMA reply path.

This mechanism also eliminates the need for rpcrdma_tail_pullup,
which is used to manage the XDR pad and trailing inline content when
a Read list is present.

This requires that the pages in rq_snd_buf's page list be DMA-mapped
during marshaling, and unmapped when a data-bearing RPC is
completed. This is slightly less efficient for very small I/O
payloads, but significantly more efficient as data payload size and
inline threshold increase past a kilobyte.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent c8b920bb
......@@ -206,7 +206,6 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_msg *headerp;
size_t rpclen;
headerp = rdmab_to_msg(req->rl_rdmabuf);
headerp->rm_xid = rqst->rq_xid;
......@@ -218,36 +217,10 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
headerp->rm_body.rm_chunks[1] = xdr_zero;
headerp->rm_body.rm_chunks[2] = xdr_zero;
rpclen = rqst->rq_svec[0].iov_len;
#ifdef RPCRDMA_BACKCHANNEL_DEBUG
pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
__func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
pr_info("RPC: %s: RPC/RDMA: %*ph\n",
__func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
pr_info("RPC: %s: RPC: %*ph\n",
__func__, (int)rpclen, rqst->rq_svec[0].iov_base);
#endif
if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
goto out_map;
req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
goto out_map;
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
req->rl_send_wr.num_sge = 2;
if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
&rqst->rq_snd_buf, rpcrdma_noch))
return -EIO;
return 0;
out_map:
pr_err("rpcrdma: failed to DMA map a Send buffer\n");
return -EIO;
}
/**
......
This diff is collapsed.
......@@ -499,30 +499,21 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return true;
}
/* RPC/RDMA marshaling may choose to send payload bearing ops inline,
* if the resulting Call message is smaller than the inline threshold.
* The value of the "rq_callsize" argument accounts for RPC header
* requirements, but not for the data payload in these cases.
*
* See rpcrdma_inline_pullup.
*/
static bool
rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
size_t size, gfp_t flags)
{
struct rpcrdma_regbuf *rb;
size_t min_size;
if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
return true;
min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
rb = rpcrdma_alloc_regbuf(min_size, DMA_TO_DEVICE, flags);
rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
if (IS_ERR(rb))
return false;
rpcrdma_free_regbuf(req->rl_sendbuf);
r_xprt->rx_stats.hardway_register_count += min_size;
r_xprt->rx_stats.hardway_register_count += size;
req->rl_sendbuf = rb;
return true;
}
......@@ -623,14 +614,15 @@ xprt_rdma_free(struct rpc_task *task)
struct rpc_rqst *rqst = task->tk_rqstp;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
if (req->rl_backchannel)
return;
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req,
!RPC_IS_ASYNC(task));
ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
rpcrdma_unmap_sges(ia, req);
rpcrdma_buffer_put(req);
}
......
......@@ -493,7 +493,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
unsigned int max_qp_wr;
int rc;
if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_SEND_SGES) {
dprintk("RPC: %s: insufficient sge's available\n",
__func__);
return -ENOMEM;
......@@ -522,7 +522,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */
ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_SEND_SGES;
ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
......@@ -891,7 +891,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
INIT_LIST_HEAD(&req->rl_registered);
req->rl_send_wr.next = NULL;
req->rl_send_wr.wr_cqe = &req->rl_cqe;
req->rl_send_wr.sg_list = req->rl_send_iov;
req->rl_send_wr.sg_list = req->rl_send_sge;
req->rl_send_wr.opcode = IB_WR_SEND;
return req;
}
......@@ -1306,11 +1306,9 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_ep *ep,
struct rpcrdma_req *req)
{
struct ib_device *device = ia->ri_device;
struct ib_send_wr *send_wr = &req->rl_send_wr;
struct ib_send_wr *send_wr_fail;
struct ib_sge *sge = req->rl_send_iov;
int i, rc;
int rc;
if (req->rl_reply) {
rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
......@@ -1319,9 +1317,6 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
req->rl_reply = NULL;
}
for (i = 0; i < send_wr->num_sge; i++)
ib_dma_sync_single_for_device(device, sge[i].addr,
sge[i].length, DMA_TO_DEVICE);
dprintk("RPC: %s: posting %d s/g entries\n",
__func__, send_wr->num_sge);
......
......@@ -285,16 +285,27 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
char *mr_offset; /* kva if no page, else offset */
};
#define RPCRDMA_MAX_IOVS (2)
/* Reserve enough Send SGEs to send a maximum size inline request:
* - RPC-over-RDMA header
* - xdr_buf head iovec
* - RPCRDMA_MAX_INLINE bytes, possibly unaligned, in pages
* - xdr_buf tail iovec
*/
enum {
RPCRDMA_MAX_SEND_PAGES = PAGE_SIZE + RPCRDMA_MAX_INLINE - 1,
RPCRDMA_MAX_PAGE_SGES = (RPCRDMA_MAX_SEND_PAGES >> PAGE_SHIFT) + 1,
RPCRDMA_MAX_SEND_SGES = 1 + 1 + RPCRDMA_MAX_PAGE_SGES + 1,
};
struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_free;
unsigned int rl_mapped_sges;
unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;
struct ib_send_wr rl_send_wr;
struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS];
struct ib_sge rl_send_sge[RPCRDMA_MAX_SEND_SGES];
struct rpcrdma_regbuf *rl_rdmabuf; /* xprt header */
struct rpcrdma_regbuf *rl_sendbuf; /* rq_snd_buf */
struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
......@@ -529,6 +540,18 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
/*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
*/
enum rpcrdma_chunktype {
rpcrdma_noch = 0,
rpcrdma_readch,
rpcrdma_areadch,
rpcrdma_writech,
rpcrdma_replych
};
bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
u32, struct xdr_buf *, enum rpcrdma_chunktype);
void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
int rpcrdma_marshal_req(struct rpc_rqst *);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment