Commit b78de1dc authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Allocate and map transport header buffers at connect time

Currently the underlying RDMA device is chosen at transport set-up
time. But it will soon be at connect time instead.

The maximum size of a transport header is based on device
capabilities. Thus transport header buffers have to be allocated
_after_ the underlying device has been chosen (via address and route
resolution); ie, in the connect worker.

Thus, move the allocation of transport header buffers to the connect
worker, after the point at which the underlying RDMA device has been
chosen.

This also means the RDMA device is available to do a DMA mapping of
these buffers at connect time, instead of in the hot I/O path. Make
that optimization as well.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 25868e61
...@@ -194,6 +194,10 @@ static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt) ...@@ -194,6 +194,10 @@ static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL); req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
if (!req) if (!req)
return NULL; return NULL;
if (rpcrdma_req_setup(r_xprt, req)) {
rpcrdma_req_destroy(req);
return NULL;
}
xprt->bc_alloc_count++; xprt->bc_alloc_count++;
rqst = &req->rl_slot; rqst = &req->rl_slot;
......
...@@ -580,22 +580,19 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) ...@@ -580,22 +580,19 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
/* Prepare an SGE for the RPC-over-RDMA transport header. /* Prepare an SGE for the RPC-over-RDMA transport header.
*/ */
static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 len) struct rpcrdma_req *req, u32 len)
{ {
struct rpcrdma_sendctx *sc = req->rl_sendctx; struct rpcrdma_sendctx *sc = req->rl_sendctx;
struct rpcrdma_regbuf *rb = req->rl_rdmabuf; struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
return false;
sge->addr = rdmab_addr(rb); sge->addr = rdmab_addr(rb);
sge->length = len; sge->length = len;
sge->lkey = rdmab_lkey(rb); sge->lkey = rdmab_lkey(rb);
ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
DMA_TO_DEVICE); DMA_TO_DEVICE);
return true;
} }
/* The head iovec is straightforward, as it is usually already /* The head iovec is straightforward, as it is usually already
...@@ -836,10 +833,9 @@ inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, ...@@ -836,10 +833,9 @@ inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
req->rl_wr.num_sge = 0; req->rl_wr.num_sge = 0;
req->rl_wr.opcode = IB_WR_SEND; req->rl_wr.opcode = IB_WR_SEND;
ret = -EIO; rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen);
if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
goto out_unmap;
ret = -EIO;
switch (rtype) { switch (rtype) {
case rpcrdma_noch_pullup: case rpcrdma_noch_pullup:
if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
......
...@@ -78,6 +78,7 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt); ...@@ -78,6 +78,7 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt); static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_sendctx *sc); struct rpcrdma_sendctx *sc);
static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt); static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt); static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
...@@ -381,6 +382,8 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt) ...@@ -381,6 +382,8 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
* *
* Divest transport H/W resources associated with this adapter, * Divest transport H/W resources associated with this adapter,
* but allow it to be restored later. * but allow it to be restored later.
*
* Caller must hold the transport send lock.
*/ */
void void
rpcrdma_ia_remove(struct rpcrdma_ia *ia) rpcrdma_ia_remove(struct rpcrdma_ia *ia)
...@@ -388,8 +391,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) ...@@ -388,8 +391,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
rx_ia); rx_ia);
struct rpcrdma_ep *ep = &r_xprt->rx_ep; struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
/* This is similar to rpcrdma_ep_destroy, but: /* This is similar to rpcrdma_ep_destroy, but:
* - Don't cancel the connect worker. * - Don't cancel the connect worker.
...@@ -412,11 +413,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia) ...@@ -412,11 +413,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
* mappings and MRs are gone. * mappings and MRs are gone.
*/ */
rpcrdma_reps_unmap(r_xprt); rpcrdma_reps_unmap(r_xprt);
list_for_each_entry(req, &buf->rb_allreqs, rl_all) { rpcrdma_reqs_reset(r_xprt);
rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
}
rpcrdma_mrs_destroy(r_xprt); rpcrdma_mrs_destroy(r_xprt);
rpcrdma_sendctxs_destroy(r_xprt); rpcrdma_sendctxs_destroy(r_xprt);
ib_dealloc_pd(ia->ri_pd); ib_dealloc_pd(ia->ri_pd);
...@@ -715,6 +712,11 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ...@@ -715,6 +712,11 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
goto out; goto out;
} }
rc = rpcrdma_reqs_setup(r_xprt);
if (rc) {
rpcrdma_ep_disconnect(ep, ia);
goto out;
}
rpcrdma_mrs_create(r_xprt); rpcrdma_mrs_create(r_xprt);
out: out:
...@@ -996,32 +998,19 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, ...@@ -996,32 +998,19 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
gfp_t flags) gfp_t flags)
{ {
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req; struct rpcrdma_req *req;
size_t maxhdrsize;
req = kzalloc(sizeof(*req), flags); req = kzalloc(sizeof(*req), flags);
if (req == NULL) if (req == NULL)
goto out1; goto out1;
/* Compute maximum header buffer size in bytes */
maxhdrsize = rpcrdma_fixed_maxsz + 3 +
r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz;
maxhdrsize *= sizeof(__be32);
rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
DMA_TO_DEVICE, flags);
if (!rb)
goto out2;
req->rl_rdmabuf = rb;
xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags); req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
if (!req->rl_sendbuf) if (!req->rl_sendbuf)
goto out3; goto out2;
req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags); req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
if (!req->rl_recvbuf) if (!req->rl_recvbuf)
goto out4; goto out3;
INIT_LIST_HEAD(&req->rl_free_mrs); INIT_LIST_HEAD(&req->rl_free_mrs);
INIT_LIST_HEAD(&req->rl_registered); INIT_LIST_HEAD(&req->rl_registered);
...@@ -1030,10 +1019,8 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, ...@@ -1030,10 +1019,8 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
spin_unlock(&buffer->rb_lock); spin_unlock(&buffer->rb_lock);
return req; return req;
out4:
kfree(req->rl_sendbuf);
out3: out3:
kfree(req->rl_rdmabuf); kfree(req->rl_sendbuf);
out2: out2:
kfree(req); kfree(req);
out1: out1:
...@@ -1041,23 +1028,82 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, ...@@ -1041,23 +1028,82 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
} }
/** /**
* rpcrdma_reqs_reset - Reset all reqs owned by a transport * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
* @r_xprt: controlling transport instance * @r_xprt: controlling transport instance
* @req: rpcrdma_req object to set up
* *
* ASSUMPTION: the rb_allreqs list is stable for the duration, * Returns zero on success, and a negative errno on failure.
*/
int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct rpcrdma_regbuf *rb;
size_t maxhdrsize;
/* Compute maximum header buffer size in bytes */
maxhdrsize = rpcrdma_fixed_maxsz + 3 +
r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz;
maxhdrsize *= sizeof(__be32);
rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
DMA_TO_DEVICE, GFP_KERNEL);
if (!rb)
goto out;
if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
goto out_free;
req->rl_rdmabuf = rb;
xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
return 0;
out_free:
rpcrdma_regbuf_free(rb);
out:
return -ENOMEM;
}
/* ASSUMPTION: the rb_allreqs list is stable for the duration,
* and thus can be walked without holding rb_lock. Eg. the * and thus can be walked without holding rb_lock. Eg. the
* caller is holding the transport send lock to exclude * caller is holding the transport send lock to exclude
* device removal or disconnection. * device removal or disconnection.
*/ */
static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt) static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
{ {
struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req; struct rpcrdma_req *req;
int rc;
list_for_each_entry(req, &buf->rb_allreqs, rl_all) { list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
/* Credits are valid only for one connection */ rc = rpcrdma_req_setup(r_xprt, req);
req->rl_slot.rq_cong = 0; if (rc)
return rc;
} }
return 0;
}
static void rpcrdma_req_reset(struct rpcrdma_req *req)
{
/* Credits are valid for only one connection */
req->rl_slot.rq_cong = 0;
rpcrdma_regbuf_free(req->rl_rdmabuf);
req->rl_rdmabuf = NULL;
rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
}
/* ASSUMPTION: the rb_allreqs list is stable for the duration,
* and thus can be walked without holding rb_lock. Eg. the
* caller is holding the transport send lock to exclude
* device removal or disconnection.
*/
static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
list_for_each_entry(req, &buf->rb_allreqs, rl_all)
rpcrdma_req_reset(req);
} }
static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
......
...@@ -478,6 +478,7 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); ...@@ -478,6 +478,7 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
*/ */
struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
gfp_t flags); gfp_t flags);
int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
void rpcrdma_req_destroy(struct rpcrdma_req *req); void rpcrdma_req_destroy(struct rpcrdma_req *req);
int rpcrdma_buffer_create(struct rpcrdma_xprt *); int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment