Commit 347543e6 authored by Trond Myklebust's avatar Trond Myklebust

Merge tag 'nfs-rdma-for-5.3-1' of git://git.linux-nfs.org/projects/anna/linux-nfs

NFSoRDMA client updates for 5.3

New features:
- Add a way to place MRs back on the free list
- Reduce context switching
- Add new trace events

Bugfixes and cleanups:
- Fix a BUG when tracing is enabled with NFSv4.1
- Fix a use-after-free in rpcrdma_post_recvs
- Replace use of xdr_stream_pos in rpcrdma_marshal_req
- Fix occasional transport deadlock
- Fix show_nfs_errors macros, other tracing improvements
- Remove RPCRDMA_REQ_F_PENDING and fr_state
- Various simplifications and refactors
parents 80d3c45f 62a92ba9
...@@ -414,27 +414,39 @@ static __be32 ...@@ -414,27 +414,39 @@ static __be32
validate_seqid(const struct nfs4_slot_table *tbl, const struct nfs4_slot *slot, validate_seqid(const struct nfs4_slot_table *tbl, const struct nfs4_slot *slot,
const struct cb_sequenceargs * args) const struct cb_sequenceargs * args)
{ {
__be32 ret;
ret = cpu_to_be32(NFS4ERR_BADSLOT);
if (args->csa_slotid > tbl->server_highest_slotid) if (args->csa_slotid > tbl->server_highest_slotid)
return htonl(NFS4ERR_BADSLOT); goto out_err;
/* Replay */ /* Replay */
if (args->csa_sequenceid == slot->seq_nr) { if (args->csa_sequenceid == slot->seq_nr) {
ret = cpu_to_be32(NFS4ERR_DELAY);
if (nfs4_test_locked_slot(tbl, slot->slot_nr)) if (nfs4_test_locked_slot(tbl, slot->slot_nr))
return htonl(NFS4ERR_DELAY); goto out_err;
/* Signal process_op to set this error on next op */ /* Signal process_op to set this error on next op */
ret = cpu_to_be32(NFS4ERR_RETRY_UNCACHED_REP);
if (args->csa_cachethis == 0) if (args->csa_cachethis == 0)
return htonl(NFS4ERR_RETRY_UNCACHED_REP); goto out_err;
/* Liar! We never allowed you to set csa_cachethis != 0 */ /* Liar! We never allowed you to set csa_cachethis != 0 */
return htonl(NFS4ERR_SEQ_FALSE_RETRY); ret = cpu_to_be32(NFS4ERR_SEQ_FALSE_RETRY);
goto out_err;
} }
/* Note: wraparound relies on seq_nr being of type u32 */ /* Note: wraparound relies on seq_nr being of type u32 */
if (likely(args->csa_sequenceid == slot->seq_nr + 1))
return htonl(NFS4_OK);
/* Misordered request */ /* Misordered request */
return htonl(NFS4ERR_SEQ_MISORDERED); ret = cpu_to_be32(NFS4ERR_SEQ_MISORDERED);
if (args->csa_sequenceid != slot->seq_nr + 1)
goto out_err;
return cpu_to_be32(NFS4_OK);
out_err:
trace_nfs4_cb_seqid_err(args, ret);
return ret;
} }
/* /*
......
...@@ -151,7 +151,7 @@ static int decode_stat(struct xdr_stream *xdr, enum nfs_stat *status) ...@@ -151,7 +151,7 @@ static int decode_stat(struct xdr_stream *xdr, enum nfs_stat *status)
return 0; return 0;
out_status: out_status:
*status = be32_to_cpup(p); *status = be32_to_cpup(p);
trace_nfs_xdr_status((int)*status); trace_nfs_xdr_status(xdr, (int)*status);
return 0; return 0;
} }
......
...@@ -343,7 +343,7 @@ static int decode_nfsstat3(struct xdr_stream *xdr, enum nfs_stat *status) ...@@ -343,7 +343,7 @@ static int decode_nfsstat3(struct xdr_stream *xdr, enum nfs_stat *status)
return 0; return 0;
out_status: out_status:
*status = be32_to_cpup(p); *status = be32_to_cpup(p);
trace_nfs_xdr_status((int)*status); trace_nfs_xdr_status(xdr, (int)*status);
return 0; return 0;
} }
......
This diff is collapsed.
...@@ -3187,7 +3187,7 @@ static bool __decode_op_hdr(struct xdr_stream *xdr, enum nfs_opnum4 expected, ...@@ -3187,7 +3187,7 @@ static bool __decode_op_hdr(struct xdr_stream *xdr, enum nfs_opnum4 expected,
return true; return true;
out_status: out_status:
nfserr = be32_to_cpup(p); nfserr = be32_to_cpup(p);
trace_nfs4_xdr_status(opnum, nfserr); trace_nfs4_xdr_status(xdr, opnum, nfserr);
*nfs_retval = nfs4_stat_to_errno(nfserr); *nfs_retval = nfs4_stat_to_errno(nfserr);
return true; return true;
out_bad_operation: out_bad_operation:
......
This diff is collapsed.
...@@ -335,6 +335,9 @@ struct xprt_class { ...@@ -335,6 +335,9 @@ struct xprt_class {
*/ */
struct rpc_xprt *xprt_create_transport(struct xprt_create *args); struct rpc_xprt *xprt_create_transport(struct xprt_create *args);
void xprt_connect(struct rpc_task *task); void xprt_connect(struct rpc_task *task);
unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt);
void xprt_reconnect_backoff(struct rpc_xprt *xprt,
unsigned long init_to);
void xprt_reserve(struct rpc_task *task); void xprt_reserve(struct rpc_task *task);
void xprt_retry_reserve(struct rpc_task *task); void xprt_retry_reserve(struct rpc_task *task);
int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task); int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
......
...@@ -181,18 +181,6 @@ DECLARE_EVENT_CLASS(xprtrdma_wrch_event, ...@@ -181,18 +181,6 @@ DECLARE_EVENT_CLASS(xprtrdma_wrch_event,
), \ ), \
TP_ARGS(task, mr, nsegs)) TP_ARGS(task, mr, nsegs))
TRACE_DEFINE_ENUM(FRWR_IS_INVALID);
TRACE_DEFINE_ENUM(FRWR_IS_VALID);
TRACE_DEFINE_ENUM(FRWR_FLUSHED_FR);
TRACE_DEFINE_ENUM(FRWR_FLUSHED_LI);
#define xprtrdma_show_frwr_state(x) \
__print_symbolic(x, \
{ FRWR_IS_INVALID, "INVALID" }, \
{ FRWR_IS_VALID, "VALID" }, \
{ FRWR_FLUSHED_FR, "FLUSHED_FR" }, \
{ FRWR_FLUSHED_LI, "FLUSHED_LI" })
DECLARE_EVENT_CLASS(xprtrdma_frwr_done, DECLARE_EVENT_CLASS(xprtrdma_frwr_done,
TP_PROTO( TP_PROTO(
const struct ib_wc *wc, const struct ib_wc *wc,
...@@ -203,22 +191,19 @@ DECLARE_EVENT_CLASS(xprtrdma_frwr_done, ...@@ -203,22 +191,19 @@ DECLARE_EVENT_CLASS(xprtrdma_frwr_done,
TP_STRUCT__entry( TP_STRUCT__entry(
__field(const void *, mr) __field(const void *, mr)
__field(unsigned int, state)
__field(unsigned int, status) __field(unsigned int, status)
__field(unsigned int, vendor_err) __field(unsigned int, vendor_err)
), ),
TP_fast_assign( TP_fast_assign(
__entry->mr = container_of(frwr, struct rpcrdma_mr, frwr); __entry->mr = container_of(frwr, struct rpcrdma_mr, frwr);
__entry->state = frwr->fr_state;
__entry->status = wc->status; __entry->status = wc->status;
__entry->vendor_err = __entry->status ? wc->vendor_err : 0; __entry->vendor_err = __entry->status ? wc->vendor_err : 0;
), ),
TP_printk( TP_printk(
"mr=%p state=%s: %s (%u/0x%x)", "mr=%p: %s (%u/0x%x)",
__entry->mr, xprtrdma_show_frwr_state(__entry->state), __entry->mr, rdma_show_wc_status(__entry->status),
rdma_show_wc_status(__entry->status),
__entry->status, __entry->vendor_err __entry->status, __entry->vendor_err
) )
); );
...@@ -390,6 +375,37 @@ DEFINE_RXPRT_EVENT(xprtrdma_op_inject_dsc); ...@@ -390,6 +375,37 @@ DEFINE_RXPRT_EVENT(xprtrdma_op_inject_dsc);
DEFINE_RXPRT_EVENT(xprtrdma_op_close); DEFINE_RXPRT_EVENT(xprtrdma_op_close);
DEFINE_RXPRT_EVENT(xprtrdma_op_connect); DEFINE_RXPRT_EVENT(xprtrdma_op_connect);
TRACE_EVENT(xprtrdma_op_set_cto,
TP_PROTO(
const struct rpcrdma_xprt *r_xprt,
unsigned long connect,
unsigned long reconnect
),
TP_ARGS(r_xprt, connect, reconnect),
TP_STRUCT__entry(
__field(const void *, r_xprt)
__field(unsigned long, connect)
__field(unsigned long, reconnect)
__string(addr, rpcrdma_addrstr(r_xprt))
__string(port, rpcrdma_portstr(r_xprt))
),
TP_fast_assign(
__entry->r_xprt = r_xprt;
__entry->connect = connect;
__entry->reconnect = reconnect;
__assign_str(addr, rpcrdma_addrstr(r_xprt));
__assign_str(port, rpcrdma_portstr(r_xprt));
),
TP_printk("peer=[%s]:%s r_xprt=%p: connect=%lu reconnect=%lu",
__get_str(addr), __get_str(port), __entry->r_xprt,
__entry->connect / HZ, __entry->reconnect / HZ
)
);
TRACE_EVENT(xprtrdma_qp_event, TRACE_EVENT(xprtrdma_qp_event,
TP_PROTO( TP_PROTO(
const struct rpcrdma_xprt *r_xprt, const struct rpcrdma_xprt *r_xprt,
...@@ -470,13 +486,12 @@ TRACE_DEFINE_ENUM(rpcrdma_replych); ...@@ -470,13 +486,12 @@ TRACE_DEFINE_ENUM(rpcrdma_replych);
TRACE_EVENT(xprtrdma_marshal, TRACE_EVENT(xprtrdma_marshal,
TP_PROTO( TP_PROTO(
const struct rpc_rqst *rqst, const struct rpcrdma_req *req,
unsigned int hdrlen,
unsigned int rtype, unsigned int rtype,
unsigned int wtype unsigned int wtype
), ),
TP_ARGS(rqst, hdrlen, rtype, wtype), TP_ARGS(req, rtype, wtype),
TP_STRUCT__entry( TP_STRUCT__entry(
__field(unsigned int, task_id) __field(unsigned int, task_id)
...@@ -491,10 +506,12 @@ TRACE_EVENT(xprtrdma_marshal, ...@@ -491,10 +506,12 @@ TRACE_EVENT(xprtrdma_marshal,
), ),
TP_fast_assign( TP_fast_assign(
const struct rpc_rqst *rqst = &req->rl_slot;
__entry->task_id = rqst->rq_task->tk_pid; __entry->task_id = rqst->rq_task->tk_pid;
__entry->client_id = rqst->rq_task->tk_client->cl_clid; __entry->client_id = rqst->rq_task->tk_client->cl_clid;
__entry->xid = be32_to_cpu(rqst->rq_xid); __entry->xid = be32_to_cpu(rqst->rq_xid);
__entry->hdrlen = hdrlen; __entry->hdrlen = req->rl_hdrbuf.len;
__entry->headlen = rqst->rq_snd_buf.head[0].iov_len; __entry->headlen = rqst->rq_snd_buf.head[0].iov_len;
__entry->pagelen = rqst->rq_snd_buf.page_len; __entry->pagelen = rqst->rq_snd_buf.page_len;
__entry->taillen = rqst->rq_snd_buf.tail[0].iov_len; __entry->taillen = rqst->rq_snd_buf.tail[0].iov_len;
...@@ -538,6 +555,33 @@ TRACE_EVENT(xprtrdma_marshal_failed, ...@@ -538,6 +555,33 @@ TRACE_EVENT(xprtrdma_marshal_failed,
) )
); );
TRACE_EVENT(xprtrdma_prepsend_failed,
TP_PROTO(const struct rpc_rqst *rqst,
int ret
),
TP_ARGS(rqst, ret),
TP_STRUCT__entry(
__field(unsigned int, task_id)
__field(unsigned int, client_id)
__field(u32, xid)
__field(int, ret)
),
TP_fast_assign(
__entry->task_id = rqst->rq_task->tk_pid;
__entry->client_id = rqst->rq_task->tk_client->cl_clid;
__entry->xid = be32_to_cpu(rqst->rq_xid);
__entry->ret = ret;
),
TP_printk("task:%u@%u xid=0x%08x: ret=%d",
__entry->task_id, __entry->client_id, __entry->xid,
__entry->ret
)
);
TRACE_EVENT(xprtrdma_post_send, TRACE_EVENT(xprtrdma_post_send,
TP_PROTO( TP_PROTO(
const struct rpcrdma_req *req, const struct rpcrdma_req *req,
...@@ -559,7 +603,8 @@ TRACE_EVENT(xprtrdma_post_send, ...@@ -559,7 +603,8 @@ TRACE_EVENT(xprtrdma_post_send,
const struct rpc_rqst *rqst = &req->rl_slot; const struct rpc_rqst *rqst = &req->rl_slot;
__entry->task_id = rqst->rq_task->tk_pid; __entry->task_id = rqst->rq_task->tk_pid;
__entry->client_id = rqst->rq_task->tk_client->cl_clid; __entry->client_id = rqst->rq_task->tk_client ?
rqst->rq_task->tk_client->cl_clid : -1;
__entry->req = req; __entry->req = req;
__entry->num_sge = req->rl_sendctx->sc_wr.num_sge; __entry->num_sge = req->rl_sendctx->sc_wr.num_sge;
__entry->signaled = req->rl_sendctx->sc_wr.send_flags & __entry->signaled = req->rl_sendctx->sc_wr.send_flags &
...@@ -698,6 +743,7 @@ TRACE_EVENT(xprtrdma_wc_receive, ...@@ -698,6 +743,7 @@ TRACE_EVENT(xprtrdma_wc_receive,
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_fastreg); DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_fastreg);
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li); DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li);
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_wake); DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_wake);
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_done);
TRACE_EVENT(xprtrdma_frwr_alloc, TRACE_EVENT(xprtrdma_frwr_alloc,
TP_PROTO( TP_PROTO(
......
...@@ -59,6 +59,7 @@ static struct rpc_wait_queue delay_queue; ...@@ -59,6 +59,7 @@ static struct rpc_wait_queue delay_queue;
*/ */
struct workqueue_struct *rpciod_workqueue __read_mostly; struct workqueue_struct *rpciod_workqueue __read_mostly;
struct workqueue_struct *xprtiod_workqueue __read_mostly; struct workqueue_struct *xprtiod_workqueue __read_mostly;
EXPORT_SYMBOL_GPL(xprtiod_workqueue);
unsigned long unsigned long
rpc_task_timeout(const struct rpc_task *task) rpc_task_timeout(const struct rpc_task *task)
......
...@@ -846,6 +846,38 @@ void xprt_connect(struct rpc_task *task) ...@@ -846,6 +846,38 @@ void xprt_connect(struct rpc_task *task)
xprt_release_write(xprt, task); xprt_release_write(xprt, task);
} }
/**
* xprt_reconnect_delay - compute the wait before scheduling a connect
* @xprt: transport instance
*
*/
unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
{
unsigned long start, now = jiffies;
start = xprt->stat.connect_start + xprt->reestablish_timeout;
if (time_after(start, now))
return start - now;
return 0;
}
EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
/**
* xprt_reconnect_backoff - compute the new re-establish timeout
* @xprt: transport instance
* @init_to: initial reestablish timeout
*
*/
void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
{
xprt->reestablish_timeout <<= 1;
if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
xprt->reestablish_timeout = xprt->max_reconnect_timeout;
if (xprt->reestablish_timeout < init_to)
xprt->reestablish_timeout = init_to;
}
EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
enum xprt_xid_rb_cmp { enum xprt_xid_rb_cmp {
XID_RB_EQUAL, XID_RB_EQUAL,
XID_RB_LEFT, XID_RB_LEFT,
......
This diff is collapsed.
...@@ -366,6 +366,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, ...@@ -366,6 +366,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
unsigned int pos; unsigned int pos;
int nsegs; int nsegs;
if (rtype == rpcrdma_noch)
goto done;
pos = rqst->rq_snd_buf.head[0].iov_len; pos = rqst->rq_snd_buf.head[0].iov_len;
if (rtype == rpcrdma_areadch) if (rtype == rpcrdma_areadch)
pos = 0; pos = 0;
...@@ -389,7 +392,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, ...@@ -389,7 +392,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nsegs -= mr->mr_nents; nsegs -= mr->mr_nents;
} while (nsegs); } while (nsegs);
return 0; done:
return encode_item_not_present(xdr);
} }
/* Register and XDR encode the Write list. Supports encoding a list /* Register and XDR encode the Write list. Supports encoding a list
...@@ -417,6 +421,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, ...@@ -417,6 +421,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
int nsegs, nchunks; int nsegs, nchunks;
__be32 *segcount; __be32 *segcount;
if (wtype != rpcrdma_writech)
goto done;
seg = req->rl_segments; seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
rqst->rq_rcv_buf.head[0].iov_len, rqst->rq_rcv_buf.head[0].iov_len,
...@@ -451,7 +458,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, ...@@ -451,7 +458,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
/* Update count of segments in this Write chunk */ /* Update count of segments in this Write chunk */
*segcount = cpu_to_be32(nchunks); *segcount = cpu_to_be32(nchunks);
return 0; done:
return encode_item_not_present(xdr);
} }
/* Register and XDR encode the Reply chunk. Supports encoding an array /* Register and XDR encode the Reply chunk. Supports encoding an array
...@@ -476,6 +484,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, ...@@ -476,6 +484,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
int nsegs, nchunks; int nsegs, nchunks;
__be32 *segcount; __be32 *segcount;
if (wtype != rpcrdma_replych)
return encode_item_not_present(xdr);
seg = req->rl_segments; seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
if (nsegs < 0) if (nsegs < 0)
...@@ -511,6 +522,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, ...@@ -511,6 +522,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return 0; return 0;
} }
static void rpcrdma_sendctx_done(struct kref *kref)
{
struct rpcrdma_req *req =
container_of(kref, struct rpcrdma_req, rl_kref);
struct rpcrdma_rep *rep = req->rl_reply;
rpcrdma_complete_rqst(rep);
rep->rr_rxprt->rx_stats.reply_waits_for_send++;
}
/** /**
* rpcrdma_sendctx_unmap - DMA-unmap Send buffer * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
* @sc: sendctx containing SGEs to unmap * @sc: sendctx containing SGEs to unmap
...@@ -520,6 +541,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) ...@@ -520,6 +541,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
{ {
struct ib_sge *sge; struct ib_sge *sge;
if (!sc->sc_unmap_count)
return;
/* The first two SGEs contain the transport header and /* The first two SGEs contain the transport header and
* the inline buffer. These are always left mapped so * the inline buffer. These are always left mapped so
* they can be cheaply re-used. * they can be cheaply re-used.
...@@ -529,9 +553,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) ...@@ -529,9 +553,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length, ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
DMA_TO_DEVICE); DMA_TO_DEVICE);
if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
&sc->sc_req->rl_flags))
wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
} }
/* Prepare an SGE for the RPC-over-RDMA transport header. /* Prepare an SGE for the RPC-over-RDMA transport header.
...@@ -666,7 +688,7 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt, ...@@ -666,7 +688,7 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
out: out:
sc->sc_wr.num_sge += sge_no; sc->sc_wr.num_sge += sge_no;
if (sc->sc_unmap_count) if (sc->sc_unmap_count)
__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); kref_get(&req->rl_kref);
return true; return true;
out_regbuf: out_regbuf:
...@@ -699,22 +721,28 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, ...@@ -699,22 +721,28 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen, struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{ {
int ret;
ret = -EAGAIN;
req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx) if (!req->rl_sendctx)
return -EAGAIN; goto err;
req->rl_sendctx->sc_wr.num_sge = 0; req->rl_sendctx->sc_wr.num_sge = 0;
req->rl_sendctx->sc_unmap_count = 0; req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req; req->rl_sendctx->sc_req = req;
__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); kref_init(&req->rl_kref);
ret = -EIO;
if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
return -EIO; goto err;
if (rtype != rpcrdma_areadch) if (rtype != rpcrdma_areadch)
if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype)) if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
return -EIO; goto err;
return 0; return 0;
err:
trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
return ret;
} }
/** /**
...@@ -842,50 +870,28 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) ...@@ -842,50 +870,28 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
* send a Call message with a Position Zero Read chunk and a * send a Call message with a Position Zero Read chunk and a
* regular Read chunk at the same time. * regular Read chunk at the same time.
*/ */
if (rtype != rpcrdma_noch) { ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
if (ret)
goto out_err;
}
ret = encode_item_not_present(xdr);
if (ret) if (ret)
goto out_err; goto out_err;
ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
if (wtype == rpcrdma_writech) {
ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
if (ret)
goto out_err;
}
ret = encode_item_not_present(xdr);
if (ret) if (ret)
goto out_err; goto out_err;
ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
if (wtype != rpcrdma_replych)
ret = encode_item_not_present(xdr);
else
ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
if (ret) if (ret)
goto out_err; goto out_err;
trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype); ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
&rqst->rq_snd_buf, rtype); &rqst->rq_snd_buf, rtype);
if (ret) if (ret)
goto out_err; goto out_err;
trace_xprtrdma_marshal(req, rtype, wtype);
return 0; return 0;
out_err: out_err:
trace_xprtrdma_marshal_failed(rqst, ret); trace_xprtrdma_marshal_failed(rqst, ret);
switch (ret) { r_xprt->rx_stats.failed_marshal_count++;
case -EAGAIN: frwr_reset(req);
xprt_wait_for_buffer_space(rqst->rq_xprt);
break;
case -ENOBUFS:
break;
default:
r_xprt->rx_stats.failed_marshal_count++;
}
return ret; return ret;
} }
...@@ -1269,51 +1275,17 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) ...@@ -1269,51 +1275,17 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
goto out; goto out;
} }
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) static void rpcrdma_reply_done(struct kref *kref)
{
/* Invalidate and unmap the data payloads before waking
* the waiting application. This guarantees the memory
* regions are properly fenced from the server before the
* application accesses the data. It also ensures proper
* send flow control: waking the next RPC waits until this
* RPC has relinquished all its Send Queue entries.
*/
if (!list_empty(&req->rl_registered))
frwr_unmap_sync(r_xprt, &req->rl_registered);
/* Ensure that any DMA mapped pages associated with
* the Send of the RPC Call have been unmapped before
* allowing the RPC to complete. This protects argument
* memory not controlled by the RPC client from being
* re-used before we're done with it.
*/
if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
r_xprt->rx_stats.reply_waits_for_send++;
out_of_line_wait_on_bit(&req->rl_flags,
RPCRDMA_REQ_F_TX_RESOURCES,
bit_wait,
TASK_UNINTERRUPTIBLE);
}
}
/* Reply handling runs in the poll worker thread. Anything that
* might wait is deferred to a separate workqueue.
*/
void rpcrdma_deferred_completion(struct work_struct *work)
{ {
struct rpcrdma_rep *rep = struct rpcrdma_req *req =
container_of(work, struct rpcrdma_rep, rr_work); container_of(kref, struct rpcrdma_req, rl_kref);
struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
trace_xprtrdma_defer_cmp(rep); rpcrdma_complete_rqst(req->rl_reply);
if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
frwr_reminv(rep, &req->rl_registered);
rpcrdma_release_rqst(r_xprt, req);
rpcrdma_complete_rqst(rep);
} }
/* Process received RPC/RDMA messages. /**
* rpcrdma_reply_handler - Process received RPC/RDMA messages
* @rep: Incoming rpcrdma_rep object to process
* *
* Errors must result in the RPC task either being awakened, or * Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time. * allowed to timeout, to discover the errors at that time.
...@@ -1373,10 +1345,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -1373,10 +1345,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
} }
req->rl_reply = rep; req->rl_reply = rep;
rep->rr_rqst = rqst; rep->rr_rqst = rqst;
clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
queue_work(buf->rb_completion_wq, &rep->rr_work);
if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
frwr_reminv(rep, &req->rl_registered);
if (!list_empty(&req->rl_registered))
frwr_unmap_async(r_xprt, req);
/* LocalInv completion will complete the RPC */
else
kref_put(&req->rl_kref, rpcrdma_reply_done);
return; return;
out_badversion: out_badversion:
......
...@@ -298,6 +298,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) ...@@ -298,6 +298,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
module_put(THIS_MODULE); module_put(THIS_MODULE);
} }
/* 60 second timeout, no retries */
static const struct rpc_timeout xprt_rdma_default_timeout = { static const struct rpc_timeout xprt_rdma_default_timeout = {
.to_initval = 60 * HZ, .to_initval = 60 * HZ,
.to_maxval = 60 * HZ, .to_maxval = 60 * HZ,
...@@ -323,8 +324,9 @@ xprt_setup_rdma(struct xprt_create *args) ...@@ -323,8 +324,9 @@ xprt_setup_rdma(struct xprt_create *args)
if (!xprt) if (!xprt)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
/* 60 second timeout, no retries */
xprt->timeout = &xprt_rdma_default_timeout; xprt->timeout = &xprt_rdma_default_timeout;
xprt->connect_timeout = xprt->timeout->to_initval;
xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
xprt->bind_timeout = RPCRDMA_BIND_TO; xprt->bind_timeout = RPCRDMA_BIND_TO;
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
...@@ -487,31 +489,64 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -487,31 +489,64 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
} }
/** /**
* xprt_rdma_connect - try to establish a transport connection * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
* @xprt: controlling transport instance
* @connect_timeout: reconnect timeout after client disconnects
* @reconnect_timeout: reconnect timeout after server disconnects
*
*/
static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt,
unsigned long connect_timeout,
unsigned long reconnect_timeout)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
spin_lock(&xprt->transport_lock);
if (connect_timeout < xprt->connect_timeout) {
struct rpc_timeout to;
unsigned long initval;
to = *xprt->timeout;
initval = connect_timeout;
if (initval < RPCRDMA_INIT_REEST_TO << 1)
initval = RPCRDMA_INIT_REEST_TO << 1;
to.to_initval = initval;
to.to_maxval = initval;
r_xprt->rx_timeout = to;
xprt->timeout = &r_xprt->rx_timeout;
xprt->connect_timeout = connect_timeout;
}
if (reconnect_timeout < xprt->max_reconnect_timeout)
xprt->max_reconnect_timeout = reconnect_timeout;
spin_unlock(&xprt->transport_lock);
}
/**
* xprt_rdma_connect - schedule an attempt to reconnect
* @xprt: transport state * @xprt: transport state
* @task: RPC scheduler context * @task: RPC scheduler context (unused)
* *
*/ */
static void static void
xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
unsigned long delay;
trace_xprtrdma_op_connect(r_xprt); trace_xprtrdma_op_connect(r_xprt);
delay = 0;
if (r_xprt->rx_ep.rep_connected != 0) { if (r_xprt->rx_ep.rep_connected != 0) {
/* Reconnect */ delay = xprt_reconnect_delay(xprt);
schedule_delayed_work(&r_xprt->rx_connect_worker, xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
xprt->reestablish_timeout);
xprt->reestablish_timeout <<= 1;
if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
} else {
schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
if (!RPC_IS_ASYNC(task))
flush_delayed_work(&r_xprt->rx_connect_worker);
} }
queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
delay);
} }
/** /**
...@@ -550,8 +585,11 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -550,8 +585,11 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
static void static void
xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst) xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
{ {
struct rpcrdma_xprt *r_xprt =
container_of(xprt, struct rpcrdma_xprt, rx_xprt);
memset(rqst, 0, sizeof(*rqst)); memset(rqst, 0, sizeof(*rqst));
rpcrdma_buffer_put(rpcr_to_rdmar(rqst)); rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
rpc_wake_up_next(&xprt->backlog); rpc_wake_up_next(&xprt->backlog);
} }
...@@ -618,9 +656,16 @@ xprt_rdma_free(struct rpc_task *task) ...@@ -618,9 +656,16 @@ xprt_rdma_free(struct rpc_task *task)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
rpcrdma_release_rqst(r_xprt, req);
trace_xprtrdma_op_free(task, req); trace_xprtrdma_op_free(task, req);
if (!list_empty(&req->rl_registered))
frwr_unmap_sync(r_xprt, req);
/* XXX: If the RPC is completing because of a signal and
* not because a reply was received, we ought to ensure
* that the Send completion has fired, so that memory
* involved with the Send is not still visible to the NIC.
*/
} }
/** /**
...@@ -667,7 +712,6 @@ xprt_rdma_send_request(struct rpc_rqst *rqst) ...@@ -667,7 +712,6 @@ xprt_rdma_send_request(struct rpc_rqst *rqst)
goto drop_connection; goto drop_connection;
rqst->rq_xtime = ktime_get(); rqst->rq_xtime = ktime_get();
__set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection; goto drop_connection;
...@@ -760,6 +804,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = { ...@@ -760,6 +804,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = {
.send_request = xprt_rdma_send_request, .send_request = xprt_rdma_send_request,
.close = xprt_rdma_close, .close = xprt_rdma_close,
.destroy = xprt_rdma_destroy, .destroy = xprt_rdma_destroy,
.set_connect_timeout = xprt_rdma_tcp_set_connect_timeout,
.print_stats = xprt_rdma_print_stats, .print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap, .enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap, .disable_swap = xprt_rdma_disable_swap,
......
...@@ -89,14 +89,12 @@ static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); ...@@ -89,14 +89,12 @@ static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
*/ */
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{ {
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_ia *ia = &r_xprt->rx_ia;
/* Flush Receives, then wait for deferred Reply work /* Flush Receives, then wait for deferred Reply work
* to complete. * to complete.
*/ */
ib_drain_rq(ia->ri_id->qp); ib_drain_rq(ia->ri_id->qp);
drain_workqueue(buf->rb_completion_wq);
/* Deferred Reply processing might have scheduled /* Deferred Reply processing might have scheduled
* local invalidations. * local invalidations.
...@@ -901,7 +899,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt) ...@@ -901,7 +899,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
* completions recently. This is a sign the Send Queue is * completions recently. This is a sign the Send Queue is
* backing up. Cause the caller to pause and try again. * backing up. Cause the caller to pause and try again.
*/ */
set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags); xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
r_xprt->rx_stats.empty_sendctx_q++; r_xprt->rx_stats.empty_sendctx_q++;
return NULL; return NULL;
} }
...@@ -936,10 +934,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) ...@@ -936,10 +934,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
/* Paired with READ_ONCE */ /* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail); smp_store_release(&buf->rb_sc_tail, next_tail);
if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) { xprt_write_space(&sc->sc_xprt->rx_xprt);
smp_mb__after_atomic();
xprt_write_space(&sc->sc_xprt->rx_xprt);
}
} }
static void static void
...@@ -977,8 +972,6 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) ...@@ -977,8 +972,6 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
r_xprt->rx_stats.mrs_allocated += count; r_xprt->rx_stats.mrs_allocated += count;
spin_unlock(&buf->rb_mrlock); spin_unlock(&buf->rb_mrlock);
trace_xprtrdma_createmrs(r_xprt, count); trace_xprtrdma_createmrs(r_xprt, count);
xprt_write_space(&r_xprt->rx_xprt);
} }
static void static void
...@@ -990,6 +983,7 @@ rpcrdma_mr_refresh_worker(struct work_struct *work) ...@@ -990,6 +983,7 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
rx_buf); rx_buf);
rpcrdma_mrs_create(r_xprt); rpcrdma_mrs_create(r_xprt);
xprt_write_space(&r_xprt->rx_xprt);
} }
/** /**
...@@ -1025,7 +1019,6 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, ...@@ -1025,7 +1019,6 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
if (!req->rl_recvbuf) if (!req->rl_recvbuf)
goto out4; goto out4;
req->rl_buffer = buffer;
INIT_LIST_HEAD(&req->rl_registered); INIT_LIST_HEAD(&req->rl_registered);
spin_lock(&buffer->rb_lock); spin_lock(&buffer->rb_lock);
list_add(&req->rl_all, &buffer->rb_allreqs); list_add(&req->rl_all, &buffer->rb_allreqs);
...@@ -1042,9 +1035,9 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, ...@@ -1042,9 +1035,9 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
return NULL; return NULL;
} }
static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
bool temp)
{ {
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_rep *rep; struct rpcrdma_rep *rep;
rep = kzalloc(sizeof(*rep), GFP_KERNEL); rep = kzalloc(sizeof(*rep), GFP_KERNEL);
...@@ -1055,27 +1048,22 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) ...@@ -1055,27 +1048,22 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp)
DMA_FROM_DEVICE, GFP_KERNEL); DMA_FROM_DEVICE, GFP_KERNEL);
if (!rep->rr_rdmabuf) if (!rep->rr_rdmabuf)
goto out_free; goto out_free;
xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf), xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
rdmab_length(rep->rr_rdmabuf)); rdmab_length(rep->rr_rdmabuf));
rep->rr_cqe.done = rpcrdma_wc_receive; rep->rr_cqe.done = rpcrdma_wc_receive;
rep->rr_rxprt = r_xprt; rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
rep->rr_recv_wr.next = NULL; rep->rr_recv_wr.next = NULL;
rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
rep->rr_recv_wr.num_sge = 1; rep->rr_recv_wr.num_sge = 1;
rep->rr_temp = temp; rep->rr_temp = temp;
return rep;
spin_lock(&buf->rb_lock);
list_add(&rep->rr_list, &buf->rb_recv_bufs);
spin_unlock(&buf->rb_lock);
return true;
out_free: out_free:
kfree(rep); kfree(rep);
out: out:
return false; return NULL;
} }
/** /**
...@@ -1089,7 +1077,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) ...@@ -1089,7 +1077,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
int i, rc; int i, rc;
buf->rb_flags = 0;
buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests; buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
buf->rb_bc_srv_max_requests = 0; buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_mrlock); spin_lock_init(&buf->rb_mrlock);
...@@ -1122,15 +1109,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) ...@@ -1122,15 +1109,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
if (rc) if (rc)
goto out; goto out;
buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
WQ_MEM_RECLAIM | WQ_HIGHPRI,
0,
r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
if (!buf->rb_completion_wq) {
rc = -ENOMEM;
goto out;
}
return 0; return 0;
out: out:
rpcrdma_buffer_destroy(buf); rpcrdma_buffer_destroy(buf);
...@@ -1204,11 +1182,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) ...@@ -1204,11 +1182,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{ {
cancel_delayed_work_sync(&buf->rb_refresh_worker); cancel_delayed_work_sync(&buf->rb_refresh_worker);
if (buf->rb_completion_wq) {
destroy_workqueue(buf->rb_completion_wq);
buf->rb_completion_wq = NULL;
}
rpcrdma_sendctxs_destroy(buf); rpcrdma_sendctxs_destroy(buf);
while (!list_empty(&buf->rb_recv_bufs)) { while (!list_empty(&buf->rb_recv_bufs)) {
...@@ -1325,13 +1298,12 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) ...@@ -1325,13 +1298,12 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
/** /**
* rpcrdma_buffer_put - Put request/reply buffers back into pool * rpcrdma_buffer_put - Put request/reply buffers back into pool
* @buffers: buffer pool
* @req: object to return * @req: object to return
* *
*/ */
void void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
rpcrdma_buffer_put(struct rpcrdma_req *req)
{ {
struct rpcrdma_buffer *buffers = req->rl_buffer;
struct rpcrdma_rep *rep = req->rl_reply; struct rpcrdma_rep *rep = req->rl_reply;
req->rl_reply = NULL; req->rl_reply = NULL;
...@@ -1484,8 +1456,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, ...@@ -1484,8 +1456,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
int rc; int rc;
if (!ep->rep_send_count || if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
send_wr->send_flags |= IB_SEND_SIGNALED; send_wr->send_flags |= IB_SEND_SIGNALED;
ep->rep_send_count = ep->rep_send_batch; ep->rep_send_count = ep->rep_send_batch;
} else { } else {
...@@ -1505,11 +1476,13 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) ...@@ -1505,11 +1476,13 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{ {
struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = &r_xprt->rx_ep; struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct ib_recv_wr *wr, *bad_wr; struct ib_recv_wr *i, *wr, *bad_wr;
struct rpcrdma_rep *rep;
int needed, count, rc; int needed, count, rc;
rc = 0; rc = 0;
count = 0; count = 0;
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
if (ep->rep_receive_count > needed) if (ep->rep_receive_count > needed)
goto out; goto out;
...@@ -1517,51 +1490,65 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) ...@@ -1517,51 +1490,65 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
if (!temp) if (!temp)
needed += RPCRDMA_MAX_RECV_BATCH; needed += RPCRDMA_MAX_RECV_BATCH;
count = 0; /* fast path: all needed reps can be found on the free list */
wr = NULL; wr = NULL;
spin_lock(&buf->rb_lock);
while (needed) { while (needed) {
struct rpcrdma_regbuf *rb;
struct rpcrdma_rep *rep;
spin_lock(&buf->rb_lock);
rep = list_first_entry_or_null(&buf->rb_recv_bufs, rep = list_first_entry_or_null(&buf->rb_recv_bufs,
struct rpcrdma_rep, rr_list); struct rpcrdma_rep, rr_list);
if (likely(rep)) if (!rep)
list_del(&rep->rr_list); break;
spin_unlock(&buf->rb_lock);
if (!rep) {
if (!rpcrdma_rep_create(r_xprt, temp))
break;
continue;
}
rb = rep->rr_rdmabuf; list_del(&rep->rr_list);
if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) { rep->rr_recv_wr.next = wr;
rpcrdma_recv_buffer_put(rep); wr = &rep->rr_recv_wr;
--needed;
}
spin_unlock(&buf->rb_lock);
while (needed) {
rep = rpcrdma_rep_create(r_xprt, temp);
if (!rep)
break; break;
}
trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
rep->rr_recv_wr.next = wr; rep->rr_recv_wr.next = wr;
wr = &rep->rr_recv_wr; wr = &rep->rr_recv_wr;
++count;
--needed; --needed;
} }
if (!count) if (!wr)
goto out; goto out;
for (i = wr; i; i = i->next) {
rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
goto release_wrs;
trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
++count;
}
rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
(const struct ib_recv_wr **)&bad_wr); (const struct ib_recv_wr **)&bad_wr);
out:
trace_xprtrdma_post_recvs(r_xprt, count, rc);
if (rc) { if (rc) {
for (wr = bad_wr; wr; wr = wr->next) { for (wr = bad_wr; wr;) {
struct rpcrdma_rep *rep; struct rpcrdma_rep *rep;
rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr); rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
wr = wr->next;
rpcrdma_recv_buffer_put(rep); rpcrdma_recv_buffer_put(rep);
--count; --count;
} }
} }
ep->rep_receive_count += count; ep->rep_receive_count += count;
out: return;
trace_xprtrdma_post_recvs(r_xprt, count, rc);
release_wrs:
for (i = wr; i;) {
rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
i = i->next;
rpcrdma_recv_buffer_put(rep);
}
} }
...@@ -44,7 +44,8 @@ ...@@ -44,7 +44,8 @@
#include <linux/wait.h> /* wait_queue_head_t, etc */ #include <linux/wait.h> /* wait_queue_head_t, etc */
#include <linux/spinlock.h> /* spinlock_t, etc */ #include <linux/spinlock.h> /* spinlock_t, etc */
#include <linux/atomic.h> /* atomic_t, etc */ #include <linux/atomic.h> /* atomic_t, etc */
#include <linux/kref.h> /* struct kref */
#include <linux/workqueue.h> /* struct work_struct */ #include <linux/workqueue.h> /* struct work_struct */
#include <rdma/rdma_cm.h> /* RDMA connection api */ #include <rdma/rdma_cm.h> /* RDMA connection api */
...@@ -202,10 +203,9 @@ struct rpcrdma_rep { ...@@ -202,10 +203,9 @@ struct rpcrdma_rep {
bool rr_temp; bool rr_temp;
struct rpcrdma_regbuf *rr_rdmabuf; struct rpcrdma_regbuf *rr_rdmabuf;
struct rpcrdma_xprt *rr_rxprt; struct rpcrdma_xprt *rr_rxprt;
struct work_struct rr_work; struct rpc_rqst *rr_rqst;
struct xdr_buf rr_hdrbuf; struct xdr_buf rr_hdrbuf;
struct xdr_stream rr_stream; struct xdr_stream rr_stream;
struct rpc_rqst *rr_rqst;
struct list_head rr_list; struct list_head rr_list;
struct ib_recv_wr rr_recv_wr; struct ib_recv_wr rr_recv_wr;
}; };
...@@ -240,18 +240,12 @@ struct rpcrdma_sendctx { ...@@ -240,18 +240,12 @@ struct rpcrdma_sendctx {
* An external memory region is any buffer or page that is registered * An external memory region is any buffer or page that is registered
* on the fly (ie, not pre-registered). * on the fly (ie, not pre-registered).
*/ */
enum rpcrdma_frwr_state { struct rpcrdma_req;
FRWR_IS_INVALID, /* ready to be used */
FRWR_IS_VALID, /* in use */
FRWR_FLUSHED_FR, /* flushed FASTREG WR */
FRWR_FLUSHED_LI, /* flushed LOCALINV WR */
};
struct rpcrdma_frwr { struct rpcrdma_frwr {
struct ib_mr *fr_mr; struct ib_mr *fr_mr;
struct ib_cqe fr_cqe; struct ib_cqe fr_cqe;
enum rpcrdma_frwr_state fr_state;
struct completion fr_linv_done; struct completion fr_linv_done;
struct rpcrdma_req *fr_req;
union { union {
struct ib_reg_wr fr_regwr; struct ib_reg_wr fr_regwr;
struct ib_send_wr fr_invwr; struct ib_send_wr fr_invwr;
...@@ -326,7 +320,6 @@ struct rpcrdma_buffer; ...@@ -326,7 +320,6 @@ struct rpcrdma_buffer;
struct rpcrdma_req { struct rpcrdma_req {
struct list_head rl_list; struct list_head rl_list;
struct rpc_rqst rl_slot; struct rpc_rqst rl_slot;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply; struct rpcrdma_rep *rl_reply;
struct xdr_stream rl_stream; struct xdr_stream rl_stream;
struct xdr_buf rl_hdrbuf; struct xdr_buf rl_hdrbuf;
...@@ -336,18 +329,12 @@ struct rpcrdma_req { ...@@ -336,18 +329,12 @@ struct rpcrdma_req {
struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
struct list_head rl_all; struct list_head rl_all;
unsigned long rl_flags; struct kref rl_kref;
struct list_head rl_registered; /* registered segments */ struct list_head rl_registered; /* registered segments */
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
}; };
/* rl_flags */
enum {
RPCRDMA_REQ_F_PENDING = 0,
RPCRDMA_REQ_F_TX_RESOURCES,
};
static inline struct rpcrdma_req * static inline struct rpcrdma_req *
rpcr_to_rdmar(const struct rpc_rqst *rqst) rpcr_to_rdmar(const struct rpc_rqst *rqst)
{ {
...@@ -391,22 +378,15 @@ struct rpcrdma_buffer { ...@@ -391,22 +378,15 @@ struct rpcrdma_buffer {
struct list_head rb_recv_bufs; struct list_head rb_recv_bufs;
struct list_head rb_allreqs; struct list_head rb_allreqs;
unsigned long rb_flags;
u32 rb_max_requests; u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */ u32 rb_credits; /* most recent credit grant */
u32 rb_bc_srv_max_requests; u32 rb_bc_srv_max_requests;
u32 rb_bc_max_requests; u32 rb_bc_max_requests;
struct workqueue_struct *rb_completion_wq;
struct delayed_work rb_refresh_worker; struct delayed_work rb_refresh_worker;
}; };
/* rb_flags */
enum {
RPCRDMA_BUF_F_EMPTY_SCQ = 0,
};
/* /*
* Statistics for RPCRDMA * Statistics for RPCRDMA
*/ */
...@@ -452,6 +432,7 @@ struct rpcrdma_xprt { ...@@ -452,6 +432,7 @@ struct rpcrdma_xprt {
struct rpcrdma_ep rx_ep; struct rpcrdma_ep rx_ep;
struct rpcrdma_buffer rx_buf; struct rpcrdma_buffer rx_buf;
struct delayed_work rx_connect_worker; struct delayed_work rx_connect_worker;
struct rpc_timeout rx_timeout;
struct rpcrdma_stats rx_stats; struct rpcrdma_stats rx_stats;
}; };
...@@ -518,7 +499,8 @@ rpcrdma_mr_recycle(struct rpcrdma_mr *mr) ...@@ -518,7 +499,8 @@ rpcrdma_mr_recycle(struct rpcrdma_mr *mr)
} }
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_req *); void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
struct rpcrdma_req *req);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
...@@ -564,6 +546,7 @@ rpcrdma_data_dir(bool writing) ...@@ -564,6 +546,7 @@ rpcrdma_data_dir(bool writing)
/* Memory registration calls xprtrdma/frwr_ops.c /* Memory registration calls xprtrdma/frwr_ops.c
*/ */
bool frwr_is_supported(struct ib_device *device); bool frwr_is_supported(struct ib_device *device);
void frwr_reset(struct rpcrdma_req *req);
int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep); int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep);
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr); int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
void frwr_release_mr(struct rpcrdma_mr *mr); void frwr_release_mr(struct rpcrdma_mr *mr);
...@@ -574,8 +557,8 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, ...@@ -574,8 +557,8 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_mr **mr); struct rpcrdma_mr **mr);
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req); int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
struct list_head *mrs); void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
/* /*
* RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
...@@ -598,9 +581,6 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); ...@@ -598,9 +581,6 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req);
void rpcrdma_deferred_completion(struct work_struct *work);
static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
{ {
......
...@@ -2414,25 +2414,6 @@ static void xs_tcp_setup_socket(struct work_struct *work) ...@@ -2414,25 +2414,6 @@ static void xs_tcp_setup_socket(struct work_struct *work)
xprt_wake_pending_tasks(xprt, status); xprt_wake_pending_tasks(xprt, status);
} }
static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt)
{
unsigned long start, now = jiffies;
start = xprt->stat.connect_start + xprt->reestablish_timeout;
if (time_after(start, now))
return start - now;
return 0;
}
static void xs_reconnect_backoff(struct rpc_xprt *xprt)
{
xprt->reestablish_timeout <<= 1;
if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
xprt->reestablish_timeout = xprt->max_reconnect_timeout;
if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
}
/** /**
* xs_connect - connect a socket to a remote endpoint * xs_connect - connect a socket to a remote endpoint
* @xprt: pointer to transport structure * @xprt: pointer to transport structure
...@@ -2462,8 +2443,8 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -2462,8 +2443,8 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
/* Start by resetting any existing state */ /* Start by resetting any existing state */
xs_reset_transport(transport); xs_reset_transport(transport);
delay = xs_reconnect_delay(xprt); delay = xprt_reconnect_delay(xprt);
xs_reconnect_backoff(xprt); xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO);
} else } else
dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment