Commit dbcc53a5 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Clean up sendctx functions

Minor clean-ups I've stumbled on since sendctx was merged last year.
In particular, making Send completion processing more efficient
appears to have a measurable impact on IOPS throughput.

Note: test_and_clear_bit() returns a value, thus an explicit memory
barrier is not necessary.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 17e4c443
...@@ -508,30 +508,26 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, ...@@ -508,30 +508,26 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
} }
/** /**
* rpcrdma_unmap_sendctx - DMA-unmap Send buffers * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
* @sc: sendctx containing SGEs to unmap * @sc: sendctx containing SGEs to unmap
* *
*/ */
void void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
{ {
struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
struct ib_sge *sge; struct ib_sge *sge;
unsigned int count;
/* The first two SGEs contain the transport header and /* The first two SGEs contain the transport header and
* the inline buffer. These are always left mapped so * the inline buffer. These are always left mapped so
* they can be cheaply re-used. * they can be cheaply re-used.
*/ */
sge = &sc->sc_sges[2]; for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
for (count = sc->sc_unmap_count; count; ++sge, --count) ++sge, --sc->sc_unmap_count)
ib_dma_unmap_page(ia->ri_device, ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
sge->addr, sge->length, DMA_TO_DEVICE); DMA_TO_DEVICE);
if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) { if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
smp_mb__after_atomic(); &sc->sc_req->rl_flags))
wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES); wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
}
} }
/* Prepare an SGE for the RPC-over-RDMA transport header. /* Prepare an SGE for the RPC-over-RDMA transport header.
...@@ -578,6 +574,7 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt, ...@@ -578,6 +574,7 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
*/ */
if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
goto out_regbuf; goto out_regbuf;
sc->sc_device = rdmab_device(rb);
sge_no = 1; sge_no = 1;
sge[sge_no].addr = rdmab_addr(rb); sge[sge_no].addr = rdmab_addr(rb);
sge[sge_no].length = xdr->head[0].iov_len; sge[sge_no].length = xdr->head[0].iov_len;
...@@ -673,12 +670,12 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt, ...@@ -673,12 +670,12 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
return false; return false;
out_mapping_overflow: out_mapping_overflow:
rpcrdma_unmap_sendctx(sc); rpcrdma_sendctx_unmap(sc);
pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
return false; return false;
out_mapping_err: out_mapping_err:
rpcrdma_unmap_sendctx(sc); rpcrdma_sendctx_unmap(sc);
trace_xprtrdma_dma_maperr(sge[sge_no].addr); trace_xprtrdma_dma_maperr(sge[sge_no].addr);
return false; return false;
} }
...@@ -698,7 +695,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, ...@@ -698,7 +695,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen, struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{ {
req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf); req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx) if (!req->rl_sendctx)
return -EAGAIN; return -EAGAIN;
req->rl_sendctx->sc_wr.num_sge = 0; req->rl_sendctx->sc_wr.num_sge = 0;
......
...@@ -870,20 +870,20 @@ static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, ...@@ -870,20 +870,20 @@ static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
/** /**
* rpcrdma_sendctx_get_locked - Acquire a send context * rpcrdma_sendctx_get_locked - Acquire a send context
* @buf: transport buffers from which to acquire an unused context * @r_xprt: controlling transport instance
* *
* Returns pointer to a free send completion context; or NULL if * Returns pointer to a free send completion context; or NULL if
* the queue is empty. * the queue is empty.
* *
* Usage: Called to acquire an SGE array before preparing a Send WR. * Usage: Called to acquire an SGE array before preparing a Send WR.
* *
* The caller serializes calls to this function (per rpcrdma_buffer), * The caller serializes calls to this function (per transport), and
* and provides an effective memory barrier that flushes the new value * provides an effective memory barrier that flushes the new value
* of rb_sc_head. * of rb_sc_head.
*/ */
struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf) struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
{ {
struct rpcrdma_xprt *r_xprt; struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_sendctx *sc; struct rpcrdma_sendctx *sc;
unsigned long next_head; unsigned long next_head;
...@@ -908,7 +908,6 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf) ...@@ -908,7 +908,6 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
* backing up. Cause the caller to pause and try again. * backing up. Cause the caller to pause and try again.
*/ */
set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags); set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
r_xprt->rx_stats.empty_sendctx_q++; r_xprt->rx_stats.empty_sendctx_q++;
return NULL; return NULL;
} }
...@@ -920,7 +919,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf) ...@@ -920,7 +919,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
* Usage: Called from Send completion to return a sendctxt * Usage: Called from Send completion to return a sendctxt
* to the queue. * to the queue.
* *
* The caller serializes calls to this function (per rpcrdma_buffer). * The caller serializes calls to this function (per transport).
*/ */
static void static void
rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
...@@ -928,7 +927,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) ...@@ -928,7 +927,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
unsigned long next_tail; unsigned long next_tail;
/* Unmap SGEs of previously completed by unsignaled /* Unmap SGEs of previously completed but unsignaled
* Sends by walking up the queue until @sc is found. * Sends by walking up the queue until @sc is found.
*/ */
next_tail = buf->rb_sc_tail; next_tail = buf->rb_sc_tail;
...@@ -936,7 +935,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) ...@@ -936,7 +935,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
next_tail = rpcrdma_sendctx_next(buf, next_tail); next_tail = rpcrdma_sendctx_next(buf, next_tail);
/* ORDER: item must be accessed _before_ tail is updated */ /* ORDER: item must be accessed _before_ tail is updated */
rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]); rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
} while (buf->rb_sc_ctxs[next_tail] != sc); } while (buf->rb_sc_ctxs[next_tail] != sc);
......
...@@ -225,6 +225,7 @@ struct rpcrdma_xprt; ...@@ -225,6 +225,7 @@ struct rpcrdma_xprt;
struct rpcrdma_sendctx { struct rpcrdma_sendctx {
struct ib_send_wr sc_wr; struct ib_send_wr sc_wr;
struct ib_cqe sc_cqe; struct ib_cqe sc_cqe;
struct ib_device *sc_device;
struct rpcrdma_xprt *sc_xprt; struct rpcrdma_xprt *sc_xprt;
struct rpcrdma_req *sc_req; struct rpcrdma_req *sc_req;
unsigned int sc_unmap_count; unsigned int sc_unmap_count;
...@@ -536,7 +537,7 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, ...@@ -536,7 +537,7 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
void rpcrdma_req_destroy(struct rpcrdma_req *req); void rpcrdma_req_destroy(struct rpcrdma_req *req);
int rpcrdma_buffer_create(struct rpcrdma_xprt *); int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf); struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt); struct rpcrdma_mr *rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt);
void rpcrdma_mr_put(struct rpcrdma_mr *mr); void rpcrdma_mr_put(struct rpcrdma_mr *mr);
...@@ -625,7 +626,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, ...@@ -625,7 +626,7 @@ int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen, struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, struct xdr_buf *xdr,
enum rpcrdma_chunktype rtype); enum rpcrdma_chunktype rtype);
void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc); void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc);
int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment