Commit 9590d083 authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust

xprtrdma: Use xprt_pin_rqst in rpcrdma_reply_handler

Adopt the use of xprt_pin_rqst to eliminate contention between
Call-side users of rb_lock and the use of rb_lock in
rpcrdma_reply_handler.

This replaces the mechanism introduced in 431af645 ("xprtrdma:
Fix client lock-up after application signal fires").

Use recv_lock to quickly find the completing rqst, pin it, then
drop the lock. At that point invalidation and pull-up of the Reply
XDR can be done. Both are often expensive operations.

Finally, take recv_lock again to signal completion to the RPC
layer. It also protects adjustment of "cwnd".

This greatly reduces the amount of time a lock is held by the
reply handler. Comparing lock_stat results shows a marked decrease
in contention on rb_lock and recv_lock.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
[trond.myklebust@primarydata.com: Remove call to rpcrdma_buffer_put() from
   the "out_norqst:" path in rpcrdma_reply_handler.]
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent f9773b22
...@@ -855,6 +855,7 @@ void xprt_pin_rqst(struct rpc_rqst *req) ...@@ -855,6 +855,7 @@ void xprt_pin_rqst(struct rpc_rqst *req)
{ {
set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate); set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
} }
EXPORT_SYMBOL_GPL(xprt_pin_rqst);
/** /**
* xprt_unpin_rqst - Unpin a request on the transport receive list * xprt_unpin_rqst - Unpin a request on the transport receive list
...@@ -870,6 +871,7 @@ void xprt_unpin_rqst(struct rpc_rqst *req) ...@@ -870,6 +871,7 @@ void xprt_unpin_rqst(struct rpc_rqst *req)
if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate)) if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV); wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
} }
EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
__must_hold(&req->rq_xprt->recv_lock) __must_hold(&req->rq_xprt->recv_lock)
......
...@@ -781,9 +781,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) ...@@ -781,9 +781,6 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
rtype = rpcrdma_areadch; rtype = rpcrdma_areadch;
} }
req->rl_xid = rqst->rq_xid;
rpcrdma_insert_req(&r_xprt->rx_buf, req);
/* This implementation supports the following combinations /* This implementation supports the following combinations
* of chunk lists in one RPC-over-RDMA Call message: * of chunk lists in one RPC-over-RDMA Call message:
* *
...@@ -1226,14 +1223,12 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1226,14 +1223,12 @@ rpcrdma_reply_handler(struct work_struct *work)
struct rpcrdma_rep *rep = struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work); container_of(work, struct rpcrdma_rep, rr_work);
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct rpc_xprt *xprt = &r_xprt->rx_xprt;
struct xdr_stream *xdr = &rep->rr_stream; struct xdr_stream *xdr = &rep->rr_stream;
struct rpcrdma_req *req; struct rpcrdma_req *req;
struct rpc_rqst *rqst; struct rpc_rqst *rqst;
__be32 *p, xid, vers, proc; __be32 *p, xid, vers, proc;
unsigned long cwnd; unsigned long cwnd;
struct list_head mws;
int status; int status;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep); dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
...@@ -1259,21 +1254,14 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1259,21 +1254,14 @@ rpcrdma_reply_handler(struct work_struct *work)
/* Match incoming rpcrdma_rep to an rpcrdma_req to /* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks. * get context for handling any incoming chunks.
*/ */
spin_lock(&buf->rb_lock); spin_lock(&xprt->recv_lock);
req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf, xid); rqst = xprt_lookup_rqst(xprt, xid);
if (!req) if (!rqst)
goto out_nomatch; goto out_norqst;
if (req->rl_reply) xprt_pin_rqst(rqst);
goto out_duplicate; spin_unlock(&xprt->recv_lock);
req = rpcr_to_rdmar(rqst);
list_replace_init(&req->rl_registered, &mws);
rpcrdma_mark_remote_invalidation(&mws, rep);
/* Avoid races with signals and duplicate replies
* by marking this req as matched.
*/
req->rl_reply = rep; req->rl_reply = rep;
spin_unlock(&buf->rb_lock);
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
__func__, rep, req, be32_to_cpu(xid)); __func__, rep, req, be32_to_cpu(xid));
...@@ -1285,17 +1273,12 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1285,17 +1273,12 @@ rpcrdma_reply_handler(struct work_struct *work)
* waking the next RPC waits until this RPC has relinquished * waking the next RPC waits until this RPC has relinquished
* all its Send Queue entries. * all its Send Queue entries.
*/ */
if (!list_empty(&mws)) if (!list_empty(&req->rl_registered)) {
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws); rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
&req->rl_registered);
}
/* Perform XID lookup, reconstruction of the RPC reply, and
* RPC completion while holding the transport lock to ensure
* the rep, rqst, and rq_task pointers remain stable.
*/
spin_lock(&xprt->recv_lock);
rqst = xprt_lookup_rqst(xprt, xid);
if (!rqst)
goto out_norqst;
xprt->reestablish_timeout = 0; xprt->reestablish_timeout = 0;
if (vers != rpcrdma_version) if (vers != rpcrdma_version)
goto out_badversion; goto out_badversion;
...@@ -1317,12 +1300,14 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1317,12 +1300,14 @@ rpcrdma_reply_handler(struct work_struct *work)
goto out_badheader; goto out_badheader;
out: out:
spin_lock(&xprt->recv_lock);
cwnd = xprt->cwnd; cwnd = xprt->cwnd;
xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd) if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task); xprt_release_rqst_cong(rqst->rq_task);
xprt_complete_rqst(rqst->rq_task, status); xprt_complete_rqst(rqst->rq_task, status);
xprt_unpin_rqst(rqst);
spin_unlock(&xprt->recv_lock); spin_unlock(&xprt->recv_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status); __func__, xprt, rqst, status);
...@@ -1360,26 +1345,13 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1360,26 +1345,13 @@ rpcrdma_reply_handler(struct work_struct *work)
*/ */
out_norqst: out_norqst:
spin_unlock(&xprt->recv_lock); spin_unlock(&xprt->recv_lock);
rpcrdma_buffer_put(req);
dprintk("RPC: %s: race, no rqst left for req %p\n",
__func__, req);
return;
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
out_nomatch:
spin_unlock(&buf->rb_lock);
dprintk("RPC: %s: no match for incoming xid 0x%08x\n", dprintk("RPC: %s: no match for incoming xid 0x%08x\n",
__func__, be32_to_cpu(xid)); __func__, be32_to_cpu(xid));
goto repost; goto repost;
out_duplicate: out_shortreply:
spin_unlock(&buf->rb_lock); dprintk("RPC: %s: short/invalid reply\n", __func__);
dprintk("RPC: %s: " goto repost;
"duplicate reply %p to RPC request %p: xid 0x%08x\n",
__func__, rep, req, be32_to_cpu(xid));
/* If no pending RPC transaction was matched, post a replacement /* If no pending RPC transaction was matched, post a replacement
* receive buffer before returning. * receive buffer before returning.
......
...@@ -685,7 +685,6 @@ xprt_rdma_free(struct rpc_task *task) ...@@ -685,7 +685,6 @@ xprt_rdma_free(struct rpc_task *task)
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
rpcrdma_remove_req(&r_xprt->rx_buf, req);
if (!list_empty(&req->rl_registered)) if (!list_empty(&req->rl_registered))
ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task)); ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
rpcrdma_unmap_sges(ia, req); rpcrdma_unmap_sges(ia, req);
......
...@@ -1001,7 +1001,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) ...@@ -1001,7 +1001,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
spin_lock_init(&buf->rb_recovery_lock); spin_lock_init(&buf->rb_recovery_lock);
INIT_LIST_HEAD(&buf->rb_mws); INIT_LIST_HEAD(&buf->rb_mws);
INIT_LIST_HEAD(&buf->rb_all); INIT_LIST_HEAD(&buf->rb_all);
INIT_LIST_HEAD(&buf->rb_pending);
INIT_LIST_HEAD(&buf->rb_stale_mrs); INIT_LIST_HEAD(&buf->rb_stale_mrs);
INIT_DELAYED_WORK(&buf->rb_refresh_worker, INIT_DELAYED_WORK(&buf->rb_refresh_worker,
rpcrdma_mr_refresh_worker); rpcrdma_mr_refresh_worker);
......
...@@ -340,7 +340,6 @@ enum { ...@@ -340,7 +340,6 @@ enum {
struct rpcrdma_buffer; struct rpcrdma_buffer;
struct rpcrdma_req { struct rpcrdma_req {
struct list_head rl_list; struct list_head rl_list;
__be32 rl_xid;
unsigned int rl_mapped_sges; unsigned int rl_mapped_sges;
unsigned int rl_connect_cookie; unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer; struct rpcrdma_buffer *rl_buffer;
...@@ -404,7 +403,6 @@ struct rpcrdma_buffer { ...@@ -404,7 +403,6 @@ struct rpcrdma_buffer {
int rb_send_count, rb_recv_count; int rb_send_count, rb_recv_count;
struct list_head rb_send_bufs; struct list_head rb_send_bufs;
struct list_head rb_recv_bufs; struct list_head rb_recv_bufs;
struct list_head rb_pending;
u32 rb_max_requests; u32 rb_max_requests;
atomic_t rb_credits; /* most recent credit grant */ atomic_t rb_credits; /* most recent credit grant */
...@@ -557,34 +555,6 @@ void rpcrdma_destroy_req(struct rpcrdma_req *); ...@@ -557,34 +555,6 @@ void rpcrdma_destroy_req(struct rpcrdma_req *);
int rpcrdma_buffer_create(struct rpcrdma_xprt *); int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
static inline void
rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
spin_lock(&buffers->rb_lock);
if (list_empty(&req->rl_list))
list_add_tail(&req->rl_list, &buffers->rb_pending);
spin_unlock(&buffers->rb_lock);
}
static inline struct rpcrdma_req *
rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
{
struct rpcrdma_req *pos;
list_for_each_entry(pos, &buffers->rb_pending, rl_list)
if (pos->rl_xid == xid)
return pos;
return NULL;
}
static inline void
rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
spin_lock(&buffers->rb_lock);
list_del(&req->rl_list);
spin_unlock(&buffers->rb_lock);
}
struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *); struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *); void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment