Commit aaf45bd8 authored by Bart Van Assche's avatar Bart Van Assche Committed by Doug Ledford

IB/srpt: Detect session shutdown reliably

The Last WQE Reached event is only generated after one or more work
requests have been queued on the QP associated with a session. Since
session shutdown can start before any work requests have been queued,
use a zero-length RDMA write to wait until a QP has been drained.

Additionally, rework the code for closing and disconnecting a session.
Signed-off-by: default avatarBart Van Assche <bart.vanassche@sandisk.com>
Reviewed-by: default avatarChristoph Hellwig <hch@lst.de>
Cc: Sagi Grimberg <sagig@mellanox.com>
Cc: Alex Estrin <alex.estrin@intel.com>
Signed-off-by: default avatarDoug Ledford <dledford@redhat.com>
parent 8628991f
...@@ -92,10 +92,11 @@ MODULE_PARM_DESC(srpt_service_guid, ...@@ -92,10 +92,11 @@ MODULE_PARM_DESC(srpt_service_guid,
static struct ib_client srpt_client; static struct ib_client srpt_client;
static void srpt_release_cmd(struct se_cmd *se_cmd); static void srpt_release_cmd(struct se_cmd *se_cmd);
static void srpt_release_channel(struct srpt_rdma_ch *ch); static void srpt_free_ch(struct kref *kref);
static int srpt_queue_status(struct se_cmd *cmd); static int srpt_queue_status(struct se_cmd *cmd);
static void srpt_recv_done(struct ib_cq *cq, struct ib_wc *wc); static void srpt_recv_done(struct ib_cq *cq, struct ib_wc *wc);
static void srpt_send_done(struct ib_cq *cq, struct ib_wc *wc); static void srpt_send_done(struct ib_cq *cq, struct ib_wc *wc);
static void srpt_zerolength_write_done(struct ib_cq *cq, struct ib_wc *wc);
/* /*
* The only allowed channel state changes are those that change the channel * The only allowed channel state changes are those that change the channel
...@@ -175,6 +176,23 @@ static void srpt_srq_event(struct ib_event *event, void *ctx) ...@@ -175,6 +176,23 @@ static void srpt_srq_event(struct ib_event *event, void *ctx)
pr_info("SRQ event %d\n", event->event); pr_info("SRQ event %d\n", event->event);
} }
static const char *get_ch_state_name(enum rdma_ch_state s)
{
switch (s) {
case CH_CONNECTING:
return "connecting";
case CH_LIVE:
return "live";
case CH_DISCONNECTING:
return "disconnecting";
case CH_DRAINING:
return "draining";
case CH_DISCONNECTED:
return "disconnected";
}
return "???";
}
/** /**
* srpt_qp_event() - QP event callback function. * srpt_qp_event() - QP event callback function.
*/ */
...@@ -188,11 +206,9 @@ static void srpt_qp_event(struct ib_event *event, struct srpt_rdma_ch *ch) ...@@ -188,11 +206,9 @@ static void srpt_qp_event(struct ib_event *event, struct srpt_rdma_ch *ch)
ib_cm_notify(ch->cm_id, event->event); ib_cm_notify(ch->cm_id, event->event);
break; break;
case IB_EVENT_QP_LAST_WQE_REACHED: case IB_EVENT_QP_LAST_WQE_REACHED:
if (srpt_set_ch_state(ch, CH_RELEASING)) pr_debug("%s-%d, state %s: received Last WQE event.\n",
srpt_release_channel(ch); ch->sess_name, ch->qp->qp_num,
else get_ch_state_name(ch->state));
pr_debug("%s: state %d - ignored LAST_WQE.\n",
ch->sess_name, ch->state);
break; break;
default: default:
pr_err("received unrecognized IB QP event %d\n", event->event); pr_err("received unrecognized IB QP event %d\n", event->event);
...@@ -794,6 +810,37 @@ static int srpt_post_send(struct srpt_rdma_ch *ch, ...@@ -794,6 +810,37 @@ static int srpt_post_send(struct srpt_rdma_ch *ch,
return ret; return ret;
} }
/**
* srpt_zerolength_write() - Perform a zero-length RDMA write.
*
* A quote from the InfiniBand specification: C9-88: For an HCA responder
* using Reliable Connection service, for each zero-length RDMA READ or WRITE
* request, the R_Key shall not be validated, even if the request includes
* Immediate data.
*/
static int srpt_zerolength_write(struct srpt_rdma_ch *ch)
{
struct ib_send_wr wr, *bad_wr;
memset(&wr, 0, sizeof(wr));
wr.opcode = IB_WR_RDMA_WRITE;
wr.wr_cqe = &ch->zw_cqe;
wr.send_flags = IB_SEND_SIGNALED;
return ib_post_send(ch->qp, &wr, &bad_wr);
}
static void srpt_zerolength_write_done(struct ib_cq *cq, struct ib_wc *wc)
{
struct srpt_rdma_ch *ch = cq->cq_context;
WARN(wc->status == IB_WC_SUCCESS, "%s-%d: QP not in error state\n",
ch->sess_name, ch->qp->qp_num);
if (srpt_set_ch_state(ch, CH_DISCONNECTED))
schedule_work(&ch->release_work);
else
WARN_ONCE("%s-%d\n", ch->sess_name, ch->qp->qp_num);
}
/** /**
* srpt_get_desc_tbl() - Parse the data descriptors of an SRP_CMD request. * srpt_get_desc_tbl() - Parse the data descriptors of an SRP_CMD request.
* @ioctx: Pointer to the I/O context associated with the request. * @ioctx: Pointer to the I/O context associated with the request.
...@@ -1816,110 +1863,102 @@ static void srpt_destroy_ch_ib(struct srpt_rdma_ch *ch) ...@@ -1816,110 +1863,102 @@ static void srpt_destroy_ch_ib(struct srpt_rdma_ch *ch)
} }
/** /**
* __srpt_close_ch() - Close an RDMA channel by setting the QP error state. * srpt_close_ch() - Close an RDMA channel.
* *
* Reset the QP and make sure all resources associated with the channel will * Make sure all resources associated with the channel will be deallocated at
* be deallocated at an appropriate time. * an appropriate time.
* *
* Note: The caller must hold ch->sport->sdev->spinlock. * Returns true if and only if the channel state has been modified into
* CH_DRAINING.
*/ */
static void __srpt_close_ch(struct srpt_rdma_ch *ch) static bool srpt_close_ch(struct srpt_rdma_ch *ch)
{ {
enum rdma_ch_state prev_state; int ret;
unsigned long flags;
spin_lock_irqsave(&ch->spinlock, flags); if (!srpt_set_ch_state(ch, CH_DRAINING)) {
prev_state = ch->state; pr_debug("%s-%d: already closed\n", ch->sess_name,
switch (prev_state) { ch->qp->qp_num);
case CH_CONNECTING: return false;
case CH_LIVE:
ch->state = CH_DISCONNECTING;
break;
default:
break;
} }
spin_unlock_irqrestore(&ch->spinlock, flags);
switch (prev_state) { kref_get(&ch->kref);
case CH_CONNECTING:
ib_send_cm_rej(ch->cm_id, IB_CM_REJ_NO_RESOURCES, NULL, 0,
NULL, 0);
/* fall through */
case CH_LIVE:
if (ib_send_cm_dreq(ch->cm_id, NULL, 0) < 0)
pr_err("sending CM DREQ failed.\n");
break;
case CH_DISCONNECTING:
break;
case CH_DRAINING:
case CH_RELEASING:
break;
}
}
/** ret = srpt_ch_qp_err(ch);
* srpt_close_ch() - Close an RDMA channel. if (ret < 0)
*/ pr_err("%s-%d: changing queue pair into error state failed: %d\n",
static void srpt_close_ch(struct srpt_rdma_ch *ch) ch->sess_name, ch->qp->qp_num, ret);
{
struct srpt_device *sdev = ch->sport->sdev;
mutex_lock(&sdev->mutex); pr_debug("%s-%d: queued zerolength write\n", ch->sess_name,
__srpt_close_ch(ch); ch->qp->qp_num);
mutex_unlock(&sdev->mutex); ret = srpt_zerolength_write(ch);
} if (ret < 0) {
pr_err("%s-%d: queuing zero-length write failed: %d\n",
ch->sess_name, ch->qp->qp_num, ret);
if (srpt_set_ch_state(ch, CH_DISCONNECTED))
schedule_work(&ch->release_work);
else
WARN_ON_ONCE(true);
}
/** kref_put(&ch->kref, srpt_free_ch);
* srpt_shutdown_session() - Whether or not a session may be shut down.
*/ return true;
static int srpt_shutdown_session(struct se_session *se_sess)
{
return 1;
} }
/** /*
* srpt_drain_channel() - Drain a channel by resetting the IB queue pair. * Change the channel state into CH_DISCONNECTING. If a channel has not yet
* @cm_id: Pointer to the CM ID of the channel to be drained. * reached the connected state, close it. If a channel is in the connected
* * state, send a DREQ. If a DREQ has been received, send a DREP. Note: it is
* Note: Must be called from inside srpt_cm_handler to avoid a race between * the responsibility of the caller to ensure that this function is not
* accessing sdev->spinlock and the call to kfree(sdev) in srpt_remove_one() * invoked concurrently with the code that accepts a connection. This means
* (the caller of srpt_cm_handler holds the cm_id spinlock; srpt_remove_one() * that this function must either be invoked from inside a CM callback
* waits until all target sessions for the associated IB device have been * function or that it must be invoked with the srpt_port.mutex held.
* unregistered and target session registration involves a call to
* ib_destroy_cm_id(), which locks the cm_id spinlock and hence waits until
* this function has finished).
*/ */
static void srpt_drain_channel(struct srpt_rdma_ch *ch) static int srpt_disconnect_ch(struct srpt_rdma_ch *ch)
{ {
int ret; int ret;
bool do_reset = false;
WARN_ON_ONCE(irqs_disabled()); if (!srpt_set_ch_state(ch, CH_DISCONNECTING))
return -ENOTCONN;
ret = ib_send_cm_dreq(ch->cm_id, NULL, 0);
if (ret < 0)
ret = ib_send_cm_drep(ch->cm_id, NULL, 0);
if (ret < 0 && srpt_close_ch(ch))
ret = 0;
return ret;
}
do_reset = srpt_set_ch_state(ch, CH_DRAINING); static void __srpt_close_all_ch(struct srpt_device *sdev)
{
struct srpt_rdma_ch *ch;
if (do_reset) { lockdep_assert_held(&sdev->mutex);
if (ch->sess)
srpt_shutdown_session(ch->sess);
ret = srpt_ch_qp_err(ch); list_for_each_entry(ch, &sdev->rch_list, list) {
if (ret < 0) if (srpt_disconnect_ch(ch) >= 0)
pr_err("Setting queue pair in error state" pr_info("Closing channel %s-%d because target %s has been disabled\n",
" failed: %d\n", ret); ch->sess_name, ch->qp->qp_num,
sdev->device->name);
srpt_close_ch(ch);
} }
} }
/** /**
* srpt_release_channel() - Release channel resources. * srpt_shutdown_session() - Whether or not a session may be shut down.
*
* Schedules the actual release because:
* - Calling the ib_destroy_cm_id() call from inside an IB CM callback would
* trigger a deadlock.
* - It is not safe to call TCM transport_* functions from interrupt context.
*/ */
static void srpt_release_channel(struct srpt_rdma_ch *ch) static int srpt_shutdown_session(struct se_session *se_sess)
{
return 1;
}
static void srpt_free_ch(struct kref *kref)
{ {
schedule_work(&ch->release_work); struct srpt_rdma_ch *ch = container_of(kref, struct srpt_rdma_ch, kref);
kfree(ch);
} }
static void srpt_release_channel_work(struct work_struct *w) static void srpt_release_channel_work(struct work_struct *w)
...@@ -1961,7 +2000,7 @@ static void srpt_release_channel_work(struct work_struct *w) ...@@ -1961,7 +2000,7 @@ static void srpt_release_channel_work(struct work_struct *w)
wake_up(&sdev->ch_releaseQ); wake_up(&sdev->ch_releaseQ);
kfree(ch); kref_put(&ch->kref, srpt_free_ch);
} }
/** /**
...@@ -2046,17 +2085,10 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id, ...@@ -2046,17 +2085,10 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
&& param->port == ch->sport->port && param->port == ch->sport->port
&& param->listen_id == ch->sport->sdev->cm_id && param->listen_id == ch->sport->sdev->cm_id
&& ch->cm_id) { && ch->cm_id) {
if (ch->state != CH_CONNECTING if (srpt_disconnect_ch(ch) < 0)
&& ch->state != CH_LIVE)
continue; continue;
pr_info("Relogin - closed existing channel %s\n",
/* found an existing channel */ ch->sess_name);
pr_debug("Found existing channel %s"
" cm_id= %p state= %d\n",
ch->sess_name, ch->cm_id, ch->state);
__srpt_close_ch(ch);
rsp->rsp_flags = rsp->rsp_flags =
SRP_LOGIN_RSP_MULTICHAN_TERMINATED; SRP_LOGIN_RSP_MULTICHAN_TERMINATED;
} }
...@@ -2087,6 +2119,8 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id, ...@@ -2087,6 +2119,8 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
goto reject; goto reject;
} }
kref_init(&ch->kref);
ch->zw_cqe.done = srpt_zerolength_write_done;
INIT_WORK(&ch->release_work, srpt_release_channel_work); INIT_WORK(&ch->release_work, srpt_release_channel_work);
memcpy(ch->i_port_id, req->initiator_port_id, 16); memcpy(ch->i_port_id, req->initiator_port_id, 16);
memcpy(ch->t_port_id, req->target_port_id, 16); memcpy(ch->t_port_id, req->target_port_id, 16);
...@@ -2214,7 +2248,7 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id, ...@@ -2214,7 +2248,7 @@ static int srpt_cm_req_recv(struct ib_cm_id *cm_id,
goto out; goto out;
release_channel: release_channel:
srpt_set_ch_state(ch, CH_RELEASING); srpt_disconnect_ch(ch);
transport_deregister_session_configfs(ch->sess); transport_deregister_session_configfs(ch->sess);
transport_deregister_session(ch->sess); transport_deregister_session(ch->sess);
ch->sess = NULL; ch->sess = NULL;
...@@ -2263,7 +2297,6 @@ static void srpt_cm_rej_recv(struct srpt_rdma_ch *ch, ...@@ -2263,7 +2297,6 @@ static void srpt_cm_rej_recv(struct srpt_rdma_ch *ch,
ch->sess_name, ch->qp->qp_num, reason, private_data_len ? ch->sess_name, ch->qp->qp_num, reason, private_data_len ?
"; private data" : "", priv ? priv : " (?)"); "; private data" : "", priv ? priv : " (?)");
kfree(priv); kfree(priv);
srpt_drain_channel(ch);
} }
/** /**
...@@ -2291,40 +2324,6 @@ static void srpt_cm_rtu_recv(struct srpt_rdma_ch *ch) ...@@ -2291,40 +2324,6 @@ static void srpt_cm_rtu_recv(struct srpt_rdma_ch *ch)
} }
} }
/**
* srpt_cm_dreq_recv() - Process reception of a DREQ message.
*/
static void srpt_cm_dreq_recv(struct srpt_rdma_ch *ch)
{
unsigned long flags;
bool send_drep = false;
pr_debug("ch %s-%d state %d\n", ch->sess_name, ch->qp->qp_num,
ch->state);
spin_lock_irqsave(&ch->spinlock, flags);
switch (ch->state) {
case CH_CONNECTING:
case CH_LIVE:
send_drep = true;
ch->state = CH_DISCONNECTING;
break;
case CH_DISCONNECTING:
case CH_DRAINING:
case CH_RELEASING:
WARN(true, "unexpected channel state %d\n", ch->state);
break;
}
spin_unlock_irqrestore(&ch->spinlock, flags);
if (send_drep) {
if (ib_send_cm_drep(ch->cm_id, NULL, 0) < 0)
pr_err("Sending IB DREP failed.\n");
pr_info("Received DREQ and sent DREP for session %s.\n",
ch->sess_name);
}
}
/** /**
* srpt_cm_handler() - IB connection manager callback function. * srpt_cm_handler() - IB connection manager callback function.
* *
...@@ -2356,22 +2355,21 @@ static int srpt_cm_handler(struct ib_cm_id *cm_id, struct ib_cm_event *event) ...@@ -2356,22 +2355,21 @@ static int srpt_cm_handler(struct ib_cm_id *cm_id, struct ib_cm_event *event)
srpt_cm_rtu_recv(ch); srpt_cm_rtu_recv(ch);
break; break;
case IB_CM_DREQ_RECEIVED: case IB_CM_DREQ_RECEIVED:
srpt_cm_dreq_recv(ch); srpt_disconnect_ch(ch);
break; break;
case IB_CM_DREP_RECEIVED: case IB_CM_DREP_RECEIVED:
pr_info("Received CM DREP message for ch %s-%d.\n", pr_info("Received CM DREP message for ch %s-%d.\n",
ch->sess_name, ch->qp->qp_num); ch->sess_name, ch->qp->qp_num);
srpt_drain_channel(ch); srpt_close_ch(ch);
break; break;
case IB_CM_TIMEWAIT_EXIT: case IB_CM_TIMEWAIT_EXIT:
pr_info("Received CM TimeWait exit for ch %s-%d.\n", pr_info("Received CM TimeWait exit for ch %s-%d.\n",
ch->sess_name, ch->qp->qp_num); ch->sess_name, ch->qp->qp_num);
srpt_drain_channel(ch); srpt_close_ch(ch);
break; break;
case IB_CM_REP_ERROR: case IB_CM_REP_ERROR:
pr_info("Received CM REP error for ch %s-%d.\n", ch->sess_name, pr_info("Received CM REP error for ch %s-%d.\n", ch->sess_name,
ch->qp->qp_num); ch->qp->qp_num);
srpt_drain_channel(ch);
break; break;
case IB_CM_DREQ_ERROR: case IB_CM_DREQ_ERROR:
pr_info("Received CM DREQ ERROR event.\n"); pr_info("Received CM DREQ ERROR event.\n");
...@@ -2511,7 +2509,7 @@ static int srpt_write_pending(struct se_cmd *se_cmd) ...@@ -2511,7 +2509,7 @@ static int srpt_write_pending(struct se_cmd *se_cmd)
break; break;
case CH_DISCONNECTING: case CH_DISCONNECTING:
case CH_DRAINING: case CH_DRAINING:
case CH_RELEASING: case CH_DISCONNECTED:
pr_debug("cmd with tag %lld: channel disconnecting\n", pr_debug("cmd with tag %lld: channel disconnecting\n",
ioctx->cmd.tag); ioctx->cmd.tag);
srpt_set_cmd_state(ioctx, SRPT_STATE_DATA_IN); srpt_set_cmd_state(ioctx, SRPT_STATE_DATA_IN);
...@@ -2657,16 +2655,16 @@ static void srpt_refresh_port_work(struct work_struct *work) ...@@ -2657,16 +2655,16 @@ static void srpt_refresh_port_work(struct work_struct *work)
*/ */
static int srpt_release_sdev(struct srpt_device *sdev) static int srpt_release_sdev(struct srpt_device *sdev)
{ {
struct srpt_rdma_ch *ch, *tmp_ch; int i, res;
int res;
WARN_ON_ONCE(irqs_disabled()); WARN_ON_ONCE(irqs_disabled());
BUG_ON(!sdev); BUG_ON(!sdev);
mutex_lock(&sdev->mutex); mutex_lock(&sdev->mutex);
list_for_each_entry_safe(ch, tmp_ch, &sdev->rch_list, list) for (i = 0; i < ARRAY_SIZE(sdev->port); i++)
__srpt_close_ch(ch); sdev->port[i].enabled = false;
__srpt_close_all_ch(sdev);
mutex_unlock(&sdev->mutex); mutex_unlock(&sdev->mutex);
res = wait_event_interruptible(sdev->ch_releaseQ, res = wait_event_interruptible(sdev->ch_releaseQ,
...@@ -2963,7 +2961,7 @@ static void srpt_close_session(struct se_session *se_sess) ...@@ -2963,7 +2961,7 @@ static void srpt_close_session(struct se_session *se_sess)
BUG_ON(ch->release_done); BUG_ON(ch->release_done);
ch->release_done = &release_done; ch->release_done = &release_done;
wait = !list_empty(&ch->list); wait = !list_empty(&ch->list);
__srpt_close_ch(ch); srpt_disconnect_ch(ch);
mutex_unlock(&sdev->mutex); mutex_unlock(&sdev->mutex);
if (!wait) if (!wait)
......
...@@ -218,20 +218,20 @@ struct srpt_send_ioctx { ...@@ -218,20 +218,20 @@ struct srpt_send_ioctx {
/** /**
* enum rdma_ch_state - SRP channel state. * enum rdma_ch_state - SRP channel state.
* @CH_CONNECTING: QP is in RTR state; waiting for RTU. * @CH_CONNECTING: QP is in RTR state; waiting for RTU.
* @CH_LIVE: QP is in RTS state. * @CH_LIVE: QP is in RTS state.
* @CH_DISCONNECTING: DREQ has been received; waiting for DREP * @CH_DISCONNECTING: DREQ has been sent and waiting for DREP or DREQ has
* or DREQ has been send and waiting for DREP * been received.
* or . * @CH_DRAINING: DREP has been received or waiting for DREP timed out
* @CH_DRAINING: QP is in ERR state; waiting for last WQE event. * and last work request has been queued.
* @CH_RELEASING: Last WQE event has been received; releasing resources. * @CH_DISCONNECTED: Last completion has been received.
*/ */
enum rdma_ch_state { enum rdma_ch_state {
CH_CONNECTING, CH_CONNECTING,
CH_LIVE, CH_LIVE,
CH_DISCONNECTING, CH_DISCONNECTING,
CH_DRAINING, CH_DRAINING,
CH_RELEASING CH_DISCONNECTED,
}; };
/** /**
...@@ -267,6 +267,8 @@ struct srpt_rdma_ch { ...@@ -267,6 +267,8 @@ struct srpt_rdma_ch {
struct ib_cm_id *cm_id; struct ib_cm_id *cm_id;
struct ib_qp *qp; struct ib_qp *qp;
struct ib_cq *cq; struct ib_cq *cq;
struct ib_cqe zw_cqe;
struct kref kref;
int rq_size; int rq_size;
u32 rsp_size; u32 rsp_size;
atomic_t sq_wr_avail; atomic_t sq_wr_avail;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment