Commit ecf85b23 authored by Chuck Lever's avatar Chuck Lever Committed by J. Bruce Fields

svcrdma: Introduce svc_rdma_recv_ctxt

svc_rdma_op_ctxt's are pre-allocated and maintained on a per-xprt
free list. This eliminates the overhead of calling kmalloc / kfree,
both of which grab a globally shared lock that disables interrupts.
To reduce contention further, separate the use of these objects in
the Receive and Send paths in svcrdma.

Subsequent patches will take advantage of this separation by
allocating real resources which are then cached in these objects.
The allocations are freed when the transport is torn down.

I've renamed the structure so that static type checking can be used
to ensure that uses of op_ctxt and recv_ctxt are not confused. As an
additional clean up, structure fields are renamed to conform with
kernel coding conventions.

As a final clean up, helpers related to recv_ctxt are moved closer
to the functions that use them.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent bd2abef3
......@@ -128,6 +128,9 @@ struct svcxprt_rdma {
unsigned long sc_flags;
struct list_head sc_read_complete_q;
struct work_struct sc_work;
spinlock_t sc_recv_lock;
struct list_head sc_recv_ctxts;
};
/* sc_flags */
#define RDMAXPRT_CONN_PENDING 3
......@@ -142,6 +145,19 @@ struct svcxprt_rdma {
#define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD
struct svc_rdma_recv_ctxt {
struct list_head rc_list;
struct ib_recv_wr rc_recv_wr;
struct ib_cqe rc_cqe;
struct xdr_buf rc_arg;
u32 rc_byte_len;
unsigned int rc_page_count;
unsigned int rc_hdr_count;
struct ib_sge rc_sges[1 +
RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE];
struct page *rc_pages[RPCSVC_MAXPAGES];
};
/* Track DMA maps for this transport and context */
static inline void svc_rdma_count_mappings(struct svcxprt_rdma *rdma,
struct svc_rdma_op_ctxt *ctxt)
......@@ -155,13 +171,19 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
struct xdr_buf *rcvbuf);
/* svc_rdma_recvfrom.c */
extern void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma);
extern bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma);
extern void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
struct svc_rdma_recv_ctxt *ctxt,
int free_pages);
extern void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma);
extern int svc_rdma_recvfrom(struct svc_rqst *);
/* svc_rdma_rw.c */
extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *head, __be32 *p);
struct svc_rdma_recv_ctxt *head, __be32 *p);
extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
__be32 *wr_ch, struct xdr_buf *xdr);
extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
......
This diff is collapsed.
// SPDX-License-Identifier: GPL-2.0
/*
* Copyright (c) 2016 Oracle. All rights reserved.
* Copyright (c) 2016-2018 Oracle. All rights reserved.
*
* Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
*/
......@@ -227,7 +227,7 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
/* State for pulling a Read chunk.
*/
struct svc_rdma_read_info {
struct svc_rdma_op_ctxt *ri_readctxt;
struct svc_rdma_recv_ctxt *ri_readctxt;
unsigned int ri_position;
unsigned int ri_pageno;
unsigned int ri_pageoff;
......@@ -282,10 +282,10 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
ib_wc_status_msg(wc->status),
wc->status, wc->vendor_err);
svc_rdma_put_context(info->ri_readctxt, 1);
svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt, 1);
} else {
spin_lock(&rdma->sc_rq_dto_lock);
list_add_tail(&info->ri_readctxt->list,
list_add_tail(&info->ri_readctxt->rc_list,
&rdma->sc_read_complete_q);
spin_unlock(&rdma->sc_rq_dto_lock);
......@@ -607,7 +607,7 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
struct svc_rqst *rqstp,
u32 rkey, u32 len, u64 offset)
{
struct svc_rdma_op_ctxt *head = info->ri_readctxt;
struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
struct svc_rdma_rw_ctxt *ctxt;
unsigned int sge_no, seg_len;
......@@ -625,10 +625,10 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
seg_len = min_t(unsigned int, len,
PAGE_SIZE - info->ri_pageoff);
head->arg.pages[info->ri_pageno] =
head->rc_arg.pages[info->ri_pageno] =
rqstp->rq_pages[info->ri_pageno];
if (!info->ri_pageoff)
head->count++;
head->rc_page_count++;
sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
seg_len, info->ri_pageoff);
......@@ -705,9 +705,9 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
}
/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
* data lands in the page list of head->arg.pages.
* data lands in the page list of head->rc_arg.pages.
*
* Currently NFSD does not look at the head->arg.tail[0] iovec.
* Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
* Therefore, XDR round-up of the Read chunk and trailing
* inline content must both be added at the end of the pagelist.
*/
......@@ -715,10 +715,10 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
struct svc_rdma_read_info *info,
__be32 *p)
{
struct svc_rdma_op_ctxt *head = info->ri_readctxt;
struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
int ret;
info->ri_pageno = head->hdr_count;
info->ri_pageno = head->rc_hdr_count;
info->ri_pageoff = 0;
ret = svc_rdma_build_read_chunk(rqstp, info, p);
......@@ -732,11 +732,11 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
* chunk is not included in either the pagelist or in
* the tail.
*/
head->arg.tail[0].iov_base =
head->arg.head[0].iov_base + info->ri_position;
head->arg.tail[0].iov_len =
head->arg.head[0].iov_len - info->ri_position;
head->arg.head[0].iov_len = info->ri_position;
head->rc_arg.tail[0].iov_base =
head->rc_arg.head[0].iov_base + info->ri_position;
head->rc_arg.tail[0].iov_len =
head->rc_arg.head[0].iov_len - info->ri_position;
head->rc_arg.head[0].iov_len = info->ri_position;
/* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
*
......@@ -749,9 +749,9 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
*/
info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2;
head->arg.page_len = info->ri_chunklen;
head->arg.len += info->ri_chunklen;
head->arg.buflen += info->ri_chunklen;
head->rc_arg.page_len = info->ri_chunklen;
head->rc_arg.len += info->ri_chunklen;
head->rc_arg.buflen += info->ri_chunklen;
out:
return ret;
......@@ -760,7 +760,7 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
/* Construct RDMA Reads to pull over a Position Zero Read chunk.
* The start of the data lands in the first page just after
* the Transport header, and the rest lands in the page list of
* head->arg.pages.
* head->rc_arg.pages.
*
* Assumptions:
* - A PZRC has an XDR-aligned length (no implicit round-up).
......@@ -772,11 +772,11 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
struct svc_rdma_read_info *info,
__be32 *p)
{
struct svc_rdma_op_ctxt *head = info->ri_readctxt;
struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
int ret;
info->ri_pageno = head->hdr_count - 1;
info->ri_pageoff = offset_in_page(head->byte_len);
info->ri_pageno = head->rc_hdr_count - 1;
info->ri_pageoff = offset_in_page(head->rc_byte_len);
ret = svc_rdma_build_read_chunk(rqstp, info, p);
if (ret < 0)
......@@ -784,22 +784,22 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
trace_svcrdma_encode_pzr(info->ri_chunklen);
head->arg.len += info->ri_chunklen;
head->arg.buflen += info->ri_chunklen;
head->rc_arg.len += info->ri_chunklen;
head->rc_arg.buflen += info->ri_chunklen;
if (head->arg.buflen <= head->sge[0].length) {
if (head->rc_arg.buflen <= head->rc_sges[0].length) {
/* Transport header and RPC message fit entirely
* in page where head iovec resides.
*/
head->arg.head[0].iov_len = info->ri_chunklen;
head->rc_arg.head[0].iov_len = info->ri_chunklen;
} else {
/* Transport header and part of RPC message reside
* in the head iovec's page.
*/
head->arg.head[0].iov_len =
head->sge[0].length - head->byte_len;
head->arg.page_len =
info->ri_chunklen - head->arg.head[0].iov_len;
head->rc_arg.head[0].iov_len =
head->rc_sges[0].length - head->rc_byte_len;
head->rc_arg.page_len =
info->ri_chunklen - head->rc_arg.head[0].iov_len;
}
out:
......@@ -824,24 +824,24 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
* - All Read segments in @p have the same Position value.
*/
int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *head, __be32 *p)
struct svc_rdma_recv_ctxt *head, __be32 *p)
{
struct svc_rdma_read_info *info;
struct page **page;
int ret;
/* The request (with page list) is constructed in
* head->arg. Pages involved with RDMA Read I/O are
* head->rc_arg. Pages involved with RDMA Read I/O are
* transferred there.
*/
head->hdr_count = head->count;
head->arg.head[0] = rqstp->rq_arg.head[0];
head->arg.tail[0] = rqstp->rq_arg.tail[0];
head->arg.pages = head->pages;
head->arg.page_base = 0;
head->arg.page_len = 0;
head->arg.len = rqstp->rq_arg.len;
head->arg.buflen = rqstp->rq_arg.buflen;
head->rc_hdr_count = head->rc_page_count;
head->rc_arg.head[0] = rqstp->rq_arg.head[0];
head->rc_arg.tail[0] = rqstp->rq_arg.tail[0];
head->rc_arg.pages = head->rc_pages;
head->rc_arg.page_base = 0;
head->rc_arg.page_len = 0;
head->rc_arg.len = rqstp->rq_arg.len;
head->rc_arg.buflen = rqstp->rq_arg.buflen;
info = svc_rdma_read_info_alloc(rdma);
if (!info)
......@@ -867,7 +867,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
out:
/* Read sink pages have been moved from rqstp->rq_pages to
* head->arg.pages. Force svc_recv to refill those slots
* head->rc_arg.pages. Force svc_recv to refill those slots
* in rq_pages.
*/
for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++)
......
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
/*
* Copyright (c) 2016 Oracle. All rights reserved.
* Copyright (c) 2016-2018 Oracle. All rights reserved.
* Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
*
......
......@@ -63,7 +63,6 @@
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
static int svc_rdma_post_recv(struct svcxprt_rdma *xprt);
static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
struct net *net);
static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
......@@ -175,11 +174,7 @@ static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
{
unsigned int i;
/* Each RPC/RDMA credit can consume one Receive and
* one Send WQE at the same time.
*/
i = xprt->sc_sq_depth + xprt->sc_rq_depth;
i = xprt->sc_sq_depth;
while (i--) {
struct svc_rdma_op_ctxt *ctxt;
......@@ -297,54 +292,6 @@ static void qp_event_handler(struct ib_event *event, void *context)
}
}
/**
* svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
* @cq: completion queue
* @wc: completed WR
*
*/
static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
{
struct svcxprt_rdma *xprt = cq->cq_context;
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_op_ctxt *ctxt;
trace_svcrdma_wc_receive(wc);
/* WARNING: Only wc->wr_cqe and wc->status are reliable */
ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
svc_rdma_unmap_dma(ctxt);
if (wc->status != IB_WC_SUCCESS)
goto flushed;
/* All wc fields are now known to be valid */
ctxt->byte_len = wc->byte_len;
spin_lock(&xprt->sc_rq_dto_lock);
list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q);
spin_unlock(&xprt->sc_rq_dto_lock);
svc_rdma_post_recv(xprt);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
goto out;
goto out_enqueue;
flushed:
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("svcrdma: Recv: %s (%u/0x%x)\n",
ib_wc_status_msg(wc->status),
wc->status, wc->vendor_err);
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
svc_rdma_put_context(ctxt, 1);
out_enqueue:
svc_xprt_enqueue(&xprt->sc_xprt);
out:
svc_xprt_put(&xprt->sc_xprt);
}
/**
* svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
* @cq: completion queue
......@@ -392,12 +339,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
init_waitqueue_head(&cma_xprt->sc_send_wait);
spin_lock_init(&cma_xprt->sc_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock);
spin_lock_init(&cma_xprt->sc_ctxt_lock);
spin_lock_init(&cma_xprt->sc_recv_lock);
spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
/*
......@@ -411,63 +360,6 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
return cma_xprt;
}
static int
svc_rdma_post_recv(struct svcxprt_rdma *xprt)
{
struct ib_recv_wr recv_wr, *bad_recv_wr;
struct svc_rdma_op_ctxt *ctxt;
struct page *page;
dma_addr_t pa;
int sge_no;
int buflen;
int ret;
ctxt = svc_rdma_get_context(xprt);
buflen = 0;
ctxt->direction = DMA_FROM_DEVICE;
ctxt->cqe.done = svc_rdma_wc_receive;
for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
if (sge_no >= xprt->sc_max_sge) {
pr_err("svcrdma: Too many sges (%d)\n", sge_no);
goto err_put_ctxt;
}
page = alloc_page(GFP_KERNEL);
if (!page)
goto err_put_ctxt;
ctxt->pages[sge_no] = page;
pa = ib_dma_map_page(xprt->sc_cm_id->device,
page, 0, PAGE_SIZE,
DMA_FROM_DEVICE);
if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
goto err_put_ctxt;
svc_rdma_count_mappings(xprt, ctxt);
ctxt->sge[sge_no].addr = pa;
ctxt->sge[sge_no].length = PAGE_SIZE;
ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
ctxt->count = sge_no + 1;
buflen += PAGE_SIZE;
}
recv_wr.next = NULL;
recv_wr.sg_list = &ctxt->sge[0];
recv_wr.num_sge = ctxt->count;
recv_wr.wr_cqe = &ctxt->cqe;
svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
trace_svcrdma_post_recv(&recv_wr, ret);
if (ret) {
svc_rdma_unmap_dma(ctxt);
svc_rdma_put_context(ctxt, 1);
svc_xprt_put(&xprt->sc_xprt);
}
return ret;
err_put_ctxt:
svc_rdma_unmap_dma(ctxt);
svc_rdma_put_context(ctxt, 1);
return -ENOMEM;
}
static void
svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt,
struct rdma_conn_param *param)
......@@ -698,7 +590,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
struct ib_qp_init_attr qp_attr;
struct ib_device *dev;
struct sockaddr *sap;
unsigned int i, ctxts;
unsigned int ctxts;
int ret = 0;
listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
......@@ -803,14 +695,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
!rdma_ib_or_roce(dev, newxprt->sc_port_num))
goto errout;
/* Post receive buffers */
for (i = 0; i < newxprt->sc_max_requests; i++) {
ret = svc_rdma_post_recv(newxprt);
if (ret) {
dprintk("svcrdma: failure posting receive buffers\n");
goto errout;
}
}
if (!svc_rdma_post_recvs(newxprt))
goto errout;
/* Swap out the handler */
newxprt->sc_cm_id->event_handler = rdma_cma_handler;
......@@ -907,20 +793,7 @@ static void __svc_rdma_free(struct work_struct *work)
pr_err("svcrdma: sc_xprt still in use? (%d)\n",
kref_read(&xprt->xpt_ref));
while (!list_empty(&rdma->sc_read_complete_q)) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = list_first_entry(&rdma->sc_read_complete_q,
struct svc_rdma_op_ctxt, list);
list_del(&ctxt->list);
svc_rdma_put_context(ctxt, 1);
}
while (!list_empty(&rdma->sc_rq_dto_q)) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = list_first_entry(&rdma->sc_rq_dto_q,
struct svc_rdma_op_ctxt, list);
list_del(&ctxt->list);
svc_rdma_put_context(ctxt, 1);
}
svc_rdma_flush_recv_queues(rdma);
/* Warn if we leaked a resource or under-referenced */
if (rdma->sc_ctxt_used != 0)
......@@ -935,6 +808,7 @@ static void __svc_rdma_free(struct work_struct *work)
svc_rdma_destroy_rw_ctxts(rdma);
svc_rdma_destroy_ctxts(rdma);
svc_rdma_recv_ctxts_destroy(rdma);
/* Destroy the QP if present (not a listener) */
if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment