Commit 951e721c authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Introduce an FRMR recovery workqueue

After a transport disconnect, FRMRs can be left in an undetermined
state. In particular, the MR's rkey is no good.

Currently, FRMRs are fixed up by the transport connect worker, but
that can race with ->ro_unmap if an RPC happens to exit while the
transport connect worker is running.

A better way of dealing with broken FRMRs is to detect them before
they are re-used by ->ro_map. Such FRMRs are either already invalid
or are owned by the sending RPC, and thus no race with ->ro_unmap
is possible.

Introduce a mechanism for handing broken FRMRs to a workqueue to be
reset in a context that is appropriate for allocating resources
(ie. an ib_alloc_fast_reg_mr() API call).

This mechanism is not yet used, but will be in subsequent patches.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Reviewed-by: default avatarSteve Wise <swise@opengridcomputing.com>
Reviewed-By: default avatarDevesh Sharma <devesh.sharma@avagotech.com>
Tested-By: default avatarDevesh Sharma <devesh.sharma@avagotech.com>
Reviewed-by: default avatarDoug Ledford <dledford@redhat.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent fc7fbb59
...@@ -17,6 +17,74 @@ ...@@ -17,6 +17,74 @@
# define RPCDBG_FACILITY RPCDBG_TRANS # define RPCDBG_FACILITY RPCDBG_TRANS
#endif #endif
static struct workqueue_struct *frwr_recovery_wq;
#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM)
int
frwr_alloc_recovery_wq(void)
{
frwr_recovery_wq = alloc_workqueue("frwr_recovery",
FRWR_RECOVERY_WQ_FLAGS, 0);
return !frwr_recovery_wq ? -ENOMEM : 0;
}
void
frwr_destroy_recovery_wq(void)
{
struct workqueue_struct *wq;
if (!frwr_recovery_wq)
return;
wq = frwr_recovery_wq;
frwr_recovery_wq = NULL;
destroy_workqueue(wq);
}
/* Deferred reset of a single FRMR. Generate a fresh rkey by
* replacing the MR.
*
* There's no recovery if this fails. The FRMR is abandoned, but
* remains in rb_all. It will be cleaned up when the transport is
* destroyed.
*/
static void
__frwr_recovery_worker(struct work_struct *work)
{
struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
r.frmr.fr_work);
struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
if (ib_dereg_mr(r->r.frmr.fr_mr))
goto out_fail;
r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(pd, depth);
if (IS_ERR(r->r.frmr.fr_mr))
goto out_fail;
dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
r->r.frmr.fr_state = FRMR_IS_INVALID;
rpcrdma_put_mw(r_xprt, r);
return;
out_fail:
pr_warn("RPC: %s: FRMR %p unrecovered\n",
__func__, r);
}
/* A broken MR was discovered in a context that can't sleep.
* Defer recovery to the recovery worker.
*/
static void
__frwr_queue_recovery(struct rpcrdma_mw *r)
{
INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
}
static int static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
unsigned int depth) unsigned int depth)
...@@ -128,7 +196,7 @@ frwr_sendcompletion(struct ib_wc *wc) ...@@ -128,7 +196,7 @@ frwr_sendcompletion(struct ib_wc *wc)
/* WARNING: Only wr_id and status are reliable at this point */ /* WARNING: Only wr_id and status are reliable at this point */
r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
dprintk("RPC: %s: frmr %p (stale), status %d\n", pr_warn("RPC: %s: frmr %p flushed, status %d\n",
__func__, r, wc->status); __func__, r, wc->status);
r->r.frmr.fr_state = FRMR_IS_STALE; r->r.frmr.fr_state = FRMR_IS_STALE;
} }
...@@ -165,6 +233,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt) ...@@ -165,6 +233,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all); list_add(&r->mw_all, &buf->rb_all);
r->mw_sendcompletion = frwr_sendcompletion; r->mw_sendcompletion = frwr_sendcompletion;
r->r.frmr.fr_xprt = r_xprt;
} }
return 0; return 0;
......
...@@ -720,17 +720,24 @@ static void __exit xprt_rdma_cleanup(void) ...@@ -720,17 +720,24 @@ static void __exit xprt_rdma_cleanup(void)
if (rc) if (rc)
dprintk("RPC: %s: xprt_unregister returned %i\n", dprintk("RPC: %s: xprt_unregister returned %i\n",
__func__, rc); __func__, rc);
frwr_destroy_recovery_wq();
} }
static int __init xprt_rdma_init(void) static int __init xprt_rdma_init(void)
{ {
int rc; int rc;
rc = xprt_register_transport(&xprt_rdma); rc = frwr_alloc_recovery_wq();
if (rc) if (rc)
return rc; return rc;
rc = xprt_register_transport(&xprt_rdma);
if (rc) {
frwr_destroy_recovery_wq();
return rc;
}
dprintk("RPCRDMA Module Init, register RPC RDMA transport\n"); dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
dprintk("Defaults:\n"); dprintk("Defaults:\n");
......
...@@ -203,6 +203,8 @@ struct rpcrdma_frmr { ...@@ -203,6 +203,8 @@ struct rpcrdma_frmr {
struct ib_fast_reg_page_list *fr_pgl; struct ib_fast_reg_page_list *fr_pgl;
struct ib_mr *fr_mr; struct ib_mr *fr_mr;
enum rpcrdma_frmr_state fr_state; enum rpcrdma_frmr_state fr_state;
struct work_struct fr_work;
struct rpcrdma_xprt *fr_xprt;
}; };
struct rpcrdma_mw { struct rpcrdma_mw {
...@@ -427,6 +429,9 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, ...@@ -427,6 +429,9 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
int frwr_alloc_recovery_wq(void);
void frwr_destroy_recovery_wq(void);
/* /*
* Wrappers for chunk registration, shared by read/write chunk code. * Wrappers for chunk registration, shared by read/write chunk code.
*/ */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment