Commit 05eb06d8 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Fix occasional transport deadlock

Under high I/O workloads, I've noticed that an RPC/RDMA transport
occasionally deadlocks (IOPS goes to zero, and doesn't recover).
Diagnosis shows that the sendctx queue is empty, but when sendctxs
are returned to the queue, the xprt_write_space wake-up never
occurs. The wake-up logic in rpcrdma_sendctx_put_locked is racy.

I noticed that both EMPTY_SCQ and XPRT_WRITE_SPACE are implemented
via an atomic bit. Just one of those is sufficient. Removing
EMPTY_SCQ in favor of the generic bit mechanism makes the deadlock
un-reproducible.

Without EMPTY_SCQ, rpcrdma_buffer::rb_flags is no longer used and
is therefore removed.

Unfortunately this patch does not apply cleanly to stable. If
needed, someone will have to port it and test it.

Fixes: 2fad6592 ("xprtrdma: Wait on empty sendctx queue")
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 1310051c
......@@ -539,6 +539,33 @@ TRACE_EVENT(xprtrdma_marshal_failed,
)
);
TRACE_EVENT(xprtrdma_prepsend_failed,
TP_PROTO(const struct rpc_rqst *rqst,
int ret
),
TP_ARGS(rqst, ret),
TP_STRUCT__entry(
__field(unsigned int, task_id)
__field(unsigned int, client_id)
__field(u32, xid)
__field(int, ret)
),
TP_fast_assign(
__entry->task_id = rqst->rq_task->tk_pid;
__entry->client_id = rqst->rq_task->tk_client->cl_clid;
__entry->xid = be32_to_cpu(rqst->rq_xid);
__entry->ret = ret;
),
TP_printk("task:%u@%u xid=0x%08x: ret=%d",
__entry->task_id, __entry->client_id, __entry->xid,
__entry->ret
)
);
TRACE_EVENT(xprtrdma_post_send,
TP_PROTO(
const struct rpcrdma_req *req,
......
......@@ -391,7 +391,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
rpcrdma_mr_recycle(mr);
mr = rpcrdma_mr_get(r_xprt);
if (!mr)
return ERR_PTR(-EAGAIN);
goto out_getmr_err;
} while (mr->frwr.fr_state != FRWR_IS_INVALID);
frwr = &mr->frwr;
frwr->fr_state = FRWR_IS_VALID;
......@@ -448,6 +448,10 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
*out = mr;
return seg;
out_getmr_err:
xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
return ERR_PTR(-EAGAIN);
out_dmamap_err:
mr->mr_dir = DMA_NONE;
trace_xprtrdma_frwr_sgerr(mr, i);
......
......@@ -699,22 +699,28 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
int ret;
ret = -EAGAIN;
req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx)
return -EAGAIN;
goto err;
req->rl_sendctx->sc_wr.num_sge = 0;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
ret = -EIO;
if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
return -EIO;
goto err;
if (rtype != rpcrdma_areadch)
if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
return -EIO;
goto err;
return 0;
err:
trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
return ret;
}
/**
......@@ -877,15 +883,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
out_err:
trace_xprtrdma_marshal_failed(rqst, ret);
switch (ret) {
case -EAGAIN:
xprt_wait_for_buffer_space(rqst->rq_xprt);
break;
case -ENOBUFS:
break;
default:
r_xprt->rx_stats.failed_marshal_count++;
}
return ret;
}
......
......@@ -901,7 +901,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
* completions recently. This is a sign the Send Queue is
* backing up. Cause the caller to pause and try again.
*/
set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
r_xprt->rx_stats.empty_sendctx_q++;
return NULL;
}
......@@ -936,10 +936,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
/* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail);
if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
smp_mb__after_atomic();
xprt_write_space(&sc->sc_xprt->rx_xprt);
}
}
static void
......@@ -977,8 +974,6 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
r_xprt->rx_stats.mrs_allocated += count;
spin_unlock(&buf->rb_mrlock);
trace_xprtrdma_createmrs(r_xprt, count);
xprt_write_space(&r_xprt->rx_xprt);
}
static void
......@@ -990,6 +985,7 @@ rpcrdma_mr_refresh_worker(struct work_struct *work)
rx_buf);
rpcrdma_mrs_create(r_xprt);
xprt_write_space(&r_xprt->rx_xprt);
}
/**
......@@ -1089,7 +1085,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
int i, rc;
buf->rb_flags = 0;
buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_mrlock);
......
......@@ -391,7 +391,6 @@ struct rpcrdma_buffer {
struct list_head rb_recv_bufs;
struct list_head rb_allreqs;
unsigned long rb_flags;
u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */
......@@ -402,11 +401,6 @@ struct rpcrdma_buffer {
struct delayed_work rb_refresh_worker;
};
/* rb_flags */
enum {
RPCRDMA_BUF_F_EMPTY_SCQ = 0,
};
/*
* Statistics for RPCRDMA
*/
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment