Commit 8bd5ba86 authored by Chuck Lever's avatar Chuck Lever Committed by J. Bruce Fields

svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs

Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, svcrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

Because each ib_cqe carries a pointer to a completion method, the
core can now post operations on a consumer's QP, and handle the
completions itself.

svcrdma receive completions no longer use the dto_tasklet. Each
polled Receive WC is now handled individually in soft IRQ context.

The server transport's rdma_stat_rq_poll and rdma_stat_rq_prod
metrics are no longer updated.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent ec705fd4
...@@ -75,6 +75,7 @@ struct svc_rdma_op_ctxt { ...@@ -75,6 +75,7 @@ struct svc_rdma_op_ctxt {
struct svc_rdma_fastreg_mr *frmr; struct svc_rdma_fastreg_mr *frmr;
int hdr_count; int hdr_count;
struct xdr_buf arg; struct xdr_buf arg;
struct ib_cqe cqe;
struct list_head dto_q; struct list_head dto_q;
enum ib_wr_opcode wr_op; enum ib_wr_opcode wr_op;
enum ib_wc_status wc_status; enum ib_wc_status wc_status;
...@@ -174,7 +175,6 @@ struct svcxprt_rdma { ...@@ -174,7 +175,6 @@ struct svcxprt_rdma {
struct work_struct sc_work; struct work_struct sc_work;
}; };
/* sc_flags */ /* sc_flags */
#define RDMAXPRT_RQ_PENDING 1
#define RDMAXPRT_SQ_PENDING 2 #define RDMAXPRT_SQ_PENDING 2
#define RDMAXPRT_CONN_PENDING 3 #define RDMAXPRT_CONN_PENDING 3
......
...@@ -68,7 +68,6 @@ static void svc_rdma_detach(struct svc_xprt *xprt); ...@@ -68,7 +68,6 @@ static void svc_rdma_detach(struct svc_xprt *xprt);
static void svc_rdma_free(struct svc_xprt *xprt); static void svc_rdma_free(struct svc_xprt *xprt);
static int svc_rdma_has_wspace(struct svc_xprt *xprt); static int svc_rdma_has_wspace(struct svc_xprt *xprt);
static int svc_rdma_secure_port(struct svc_rqst *); static int svc_rdma_secure_port(struct svc_rqst *);
static void rq_cq_reap(struct svcxprt_rdma *xprt);
static void sq_cq_reap(struct svcxprt_rdma *xprt); static void sq_cq_reap(struct svcxprt_rdma *xprt);
static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL); static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
...@@ -413,7 +412,6 @@ static void dto_tasklet_func(unsigned long data) ...@@ -413,7 +412,6 @@ static void dto_tasklet_func(unsigned long data)
list_del_init(&xprt->sc_dto_q); list_del_init(&xprt->sc_dto_q);
spin_unlock_irqrestore(&dto_lock, flags); spin_unlock_irqrestore(&dto_lock, flags);
rq_cq_reap(xprt);
sq_cq_reap(xprt); sq_cq_reap(xprt);
svc_xprt_put(&xprt->sc_xprt); svc_xprt_put(&xprt->sc_xprt);
...@@ -422,93 +420,48 @@ static void dto_tasklet_func(unsigned long data) ...@@ -422,93 +420,48 @@ static void dto_tasklet_func(unsigned long data)
spin_unlock_irqrestore(&dto_lock, flags); spin_unlock_irqrestore(&dto_lock, flags);
} }
/* /**
* Receive Queue Completion Handler * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
* @cq: completion queue
* @wc: completed WR
* *
* Since an RQ completion handler is called on interrupt context, we
* need to defer the handling of the I/O to a tasklet
*/ */
static void rq_comp_handler(struct ib_cq *cq, void *cq_context) static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
{ {
struct svcxprt_rdma *xprt = cq_context; struct svcxprt_rdma *xprt = cq->cq_context;
unsigned long flags; struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_op_ctxt *ctxt;
/* Guard against unconditional flush call for destroyed QP */
if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
return;
/*
* Set the bit regardless of whether or not it's on the list
* because it may be on the list already due to an SQ
* completion.
*/
set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
/*
* If this transport is not already on the DTO transport queue,
* add it
*/
spin_lock_irqsave(&dto_lock, flags);
if (list_empty(&xprt->sc_dto_q)) {
svc_xprt_get(&xprt->sc_xprt);
list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
}
spin_unlock_irqrestore(&dto_lock, flags);
/* Tasklet does all the work to avoid irqsave locks. */ /* WARNING: Only wc->wr_cqe and wc->status are reliable */
tasklet_schedule(&dto_tasklet); ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
} ctxt->wc_status = wc->status;
svc_rdma_unmap_dma(ctxt);
/* if (wc->status != IB_WC_SUCCESS)
* rq_cq_reap - Process the RQ CQ. goto flushed;
*
* Take all completing WC off the CQE and enqueue the associated DTO
* context on the dto_q for the transport.
*
* Note that caller must hold a transport reference.
*/
static void rq_cq_reap(struct svcxprt_rdma *xprt)
{
int ret;
struct ib_wc wc;
struct svc_rdma_op_ctxt *ctxt = NULL;
if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) /* All wc fields are now known to be valid */
return; ctxt->byte_len = wc->byte_len;
spin_lock(&xprt->sc_rq_dto_lock);
list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
spin_unlock(&xprt->sc_rq_dto_lock);
ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
atomic_inc(&rdma_stat_rq_poll); if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
goto out;
svc_xprt_enqueue(&xprt->sc_xprt);
goto out;
while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { flushed:
ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; if (wc->status != IB_WC_WR_FLUSH_ERR)
ctxt->wc_status = wc.status; pr_warn("svcrdma: receive: %s (%u/0x%x)\n",
ctxt->byte_len = wc.byte_len; ib_wc_status_msg(wc->status),
svc_rdma_unmap_dma(ctxt); wc->status, wc->vendor_err);
if (wc.status != IB_WC_SUCCESS) {
/* Close the transport */
dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
svc_rdma_put_context(ctxt, 1); svc_rdma_put_context(ctxt, 1);
svc_xprt_put(&xprt->sc_xprt);
continue;
}
spin_lock_bh(&xprt->sc_rq_dto_lock);
list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
spin_unlock_bh(&xprt->sc_rq_dto_lock);
svc_xprt_put(&xprt->sc_xprt);
}
if (ctxt)
atomic_inc(&rdma_stat_rq_prod);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); out:
/* svc_xprt_put(&xprt->sc_xprt);
* If data arrived before established event,
* don't enqueue. This defers RPC I/O until the
* RDMA connection is complete.
*/
if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
svc_xprt_enqueue(&xprt->sc_xprt);
} }
/* /*
...@@ -681,6 +634,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) ...@@ -681,6 +634,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
ctxt = svc_rdma_get_context(xprt); ctxt = svc_rdma_get_context(xprt);
buflen = 0; buflen = 0;
ctxt->direction = DMA_FROM_DEVICE; ctxt->direction = DMA_FROM_DEVICE;
ctxt->cqe.done = svc_rdma_wc_receive;
for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
if (sge_no >= xprt->sc_max_sge) { if (sge_no >= xprt->sc_max_sge) {
pr_err("svcrdma: Too many sges (%d)\n", sge_no); pr_err("svcrdma: Too many sges (%d)\n", sge_no);
...@@ -705,7 +659,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) ...@@ -705,7 +659,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
recv_wr.next = NULL; recv_wr.next = NULL;
recv_wr.sg_list = &ctxt->sge[0]; recv_wr.sg_list = &ctxt->sge[0];
recv_wr.num_sge = ctxt->count; recv_wr.num_sge = ctxt->count;
recv_wr.wr_id = (u64)(unsigned long)ctxt; recv_wr.wr_cqe = &ctxt->cqe;
svc_xprt_get(&xprt->sc_xprt); svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
...@@ -1094,12 +1048,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -1094,12 +1048,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dprintk("svcrdma: error creating SQ CQ for connect request\n"); dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout; goto errout;
} }
cq_attr.cqe = newxprt->sc_rq_depth; newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
newxprt->sc_rq_cq = ib_create_cq(dev, 0, IB_POLL_SOFTIRQ);
rq_comp_handler,
cq_event_handler,
newxprt,
&cq_attr);
if (IS_ERR(newxprt->sc_rq_cq)) { if (IS_ERR(newxprt->sc_rq_cq)) {
dprintk("svcrdma: error creating RQ CQ for connect request\n"); dprintk("svcrdma: error creating RQ CQ for connect request\n");
goto errout; goto errout;
...@@ -1193,7 +1143,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -1193,7 +1143,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
* miss the first message * miss the first message
*/ */
ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP); ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
/* Accept Connection */ /* Accept Connection */
set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
...@@ -1337,7 +1286,7 @@ static void __svc_rdma_free(struct work_struct *work) ...@@ -1337,7 +1286,7 @@ static void __svc_rdma_free(struct work_struct *work)
ib_destroy_cq(rdma->sc_sq_cq); ib_destroy_cq(rdma->sc_sq_cq);
if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
ib_destroy_cq(rdma->sc_rq_cq); ib_free_cq(rdma->sc_rq_cq);
if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
ib_dealloc_pd(rdma->sc_pd); ib_dealloc_pd(rdma->sc_pd);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment