Commit c6f02ebe authored by Karsten Graul's avatar Karsten Graul Committed by David S. Miller

net/smc: switch connections to alternate link

Add smc_switch_conns() to switch all connections from a link that is
going down. Find an other link to switch the connections to, and
switch each connection to the new link. smc_switch_cursor() updates the
cursors of a connection to the state of the last successfully sent CDC
message. When there is no link to switch to, terminate the link group.
Call smc_switch_conns() when a link is going down.
And with the possibility that links of connections can switch adapt CDC
and TX functions to detect and handle link switches.
Signed-off-by: default avatarKarsten Graul <kgraul@linux.ibm.com>
Reviewed-by: default avatarUrsula Braun <ubraun@linux.ibm.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent f0ec4f1d
...@@ -56,11 +56,11 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, ...@@ -56,11 +56,11 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
} }
int smc_cdc_get_free_slot(struct smc_connection *conn, int smc_cdc_get_free_slot(struct smc_connection *conn,
struct smc_link *link,
struct smc_wr_buf **wr_buf, struct smc_wr_buf **wr_buf,
struct smc_rdma_wr **wr_rdma_buf, struct smc_rdma_wr **wr_rdma_buf,
struct smc_cdc_tx_pend **pend) struct smc_cdc_tx_pend **pend)
{ {
struct smc_link *link = conn->lnk;
int rc; int rc;
rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf, rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf,
...@@ -119,13 +119,27 @@ static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn) ...@@ -119,13 +119,27 @@ static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn)
{ {
struct smc_cdc_tx_pend *pend; struct smc_cdc_tx_pend *pend;
struct smc_wr_buf *wr_buf; struct smc_wr_buf *wr_buf;
struct smc_link *link;
bool again = false;
int rc; int rc;
rc = smc_cdc_get_free_slot(conn, &wr_buf, NULL, &pend); again:
link = conn->lnk;
rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
if (rc) if (rc)
return rc; return rc;
spin_lock_bh(&conn->send_lock); spin_lock_bh(&conn->send_lock);
if (link != conn->lnk) {
/* link of connection changed, try again one time*/
spin_unlock_bh(&conn->send_lock);
smc_wr_tx_put_slot(link,
(struct smc_wr_tx_pend_priv *)pend);
if (again)
return -ENOLINK;
again = true;
goto again;
}
rc = smc_cdc_msg_send(conn, wr_buf, pend); rc = smc_cdc_msg_send(conn, wr_buf, pend);
spin_unlock_bh(&conn->send_lock); spin_unlock_bh(&conn->send_lock);
return rc; return rc;
......
...@@ -304,6 +304,7 @@ struct smc_cdc_tx_pend { ...@@ -304,6 +304,7 @@ struct smc_cdc_tx_pend {
}; };
int smc_cdc_get_free_slot(struct smc_connection *conn, int smc_cdc_get_free_slot(struct smc_connection *conn,
struct smc_link *link,
struct smc_wr_buf **wr_buf, struct smc_wr_buf **wr_buf,
struct smc_rdma_wr **wr_rdma_buf, struct smc_rdma_wr **wr_rdma_buf,
struct smc_cdc_tx_pend **pend); struct smc_cdc_tx_pend **pend);
......
...@@ -432,6 +432,135 @@ static int smc_lgr_create(struct smc_sock *smc, struct smc_init_info *ini) ...@@ -432,6 +432,135 @@ static int smc_lgr_create(struct smc_sock *smc, struct smc_init_info *ini)
return rc; return rc;
} }
static int smc_write_space(struct smc_connection *conn)
{
int buffer_len = conn->peer_rmbe_size;
union smc_host_cursor prod;
union smc_host_cursor cons;
int space;
smc_curs_copy(&prod, &conn->local_tx_ctrl.prod, conn);
smc_curs_copy(&cons, &conn->local_rx_ctrl.cons, conn);
/* determine rx_buf space */
space = buffer_len - smc_curs_diff(buffer_len, &cons, &prod);
return space;
}
static int smc_switch_cursor(struct smc_sock *smc)
{
struct smc_connection *conn = &smc->conn;
union smc_host_cursor cons, fin;
int rc = 0;
int diff;
smc_curs_copy(&conn->tx_curs_sent, &conn->tx_curs_fin, conn);
smc_curs_copy(&fin, &conn->local_tx_ctrl_fin, conn);
/* set prod cursor to old state, enforce tx_rdma_writes() */
smc_curs_copy(&conn->local_tx_ctrl.prod, &fin, conn);
smc_curs_copy(&cons, &conn->local_rx_ctrl.cons, conn);
if (smc_curs_comp(conn->peer_rmbe_size, &cons, &fin) < 0) {
/* cons cursor advanced more than fin, and prod was set
* fin above, so now prod is smaller than cons. Fix that.
*/
diff = smc_curs_diff(conn->peer_rmbe_size, &fin, &cons);
smc_curs_add(conn->sndbuf_desc->len,
&conn->tx_curs_sent, diff);
smc_curs_add(conn->sndbuf_desc->len,
&conn->tx_curs_fin, diff);
smp_mb__before_atomic();
atomic_add(diff, &conn->sndbuf_space);
smp_mb__after_atomic();
smc_curs_add(conn->peer_rmbe_size,
&conn->local_tx_ctrl.prod, diff);
smc_curs_add(conn->peer_rmbe_size,
&conn->local_tx_ctrl_fin, diff);
}
/* recalculate, value is used by tx_rdma_writes() */
atomic_set(&smc->conn.peer_rmbe_space, smc_write_space(conn));
if (smc->sk.sk_state != SMC_INIT &&
smc->sk.sk_state != SMC_CLOSED) {
/* tbd: call rc = smc_cdc_get_slot_and_msg_send(conn); */
if (!rc) {
schedule_delayed_work(&conn->tx_work, 0);
smc->sk.sk_data_ready(&smc->sk);
}
}
return rc;
}
struct smc_link *smc_switch_conns(struct smc_link_group *lgr,
struct smc_link *from_lnk, bool is_dev_err)
{
struct smc_link *to_lnk = NULL;
struct smc_connection *conn;
struct smc_sock *smc;
struct rb_node *node;
int i, rc = 0;
/* link is inactive, wake up tx waiters */
smc_wr_wakeup_tx_wait(from_lnk);
for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
if (lgr->lnk[i].state != SMC_LNK_ACTIVE ||
i == from_lnk->link_idx)
continue;
if (is_dev_err && from_lnk->smcibdev == lgr->lnk[i].smcibdev &&
from_lnk->ibport == lgr->lnk[i].ibport) {
continue;
}
to_lnk = &lgr->lnk[i];
break;
}
if (!to_lnk) {
smc_lgr_terminate_sched(lgr);
return NULL;
}
again:
read_lock_bh(&lgr->conns_lock);
for (node = rb_first(&lgr->conns_all); node; node = rb_next(node)) {
conn = rb_entry(node, struct smc_connection, alert_node);
if (conn->lnk != from_lnk)
continue;
smc = container_of(conn, struct smc_sock, conn);
/* conn->lnk not yet set in SMC_INIT state */
if (smc->sk.sk_state == SMC_INIT)
continue;
if (smc->sk.sk_state == SMC_CLOSED ||
smc->sk.sk_state == SMC_PEERCLOSEWAIT1 ||
smc->sk.sk_state == SMC_PEERCLOSEWAIT2 ||
smc->sk.sk_state == SMC_APPFINCLOSEWAIT ||
smc->sk.sk_state == SMC_APPCLOSEWAIT1 ||
smc->sk.sk_state == SMC_APPCLOSEWAIT2 ||
smc->sk.sk_state == SMC_PEERFINCLOSEWAIT ||
smc->sk.sk_state == SMC_PEERABORTWAIT ||
smc->sk.sk_state == SMC_PROCESSABORT) {
spin_lock_bh(&conn->send_lock);
conn->lnk = to_lnk;
spin_unlock_bh(&conn->send_lock);
continue;
}
sock_hold(&smc->sk);
read_unlock_bh(&lgr->conns_lock);
/* avoid race with smcr_tx_sndbuf_nonempty() */
spin_lock_bh(&conn->send_lock);
conn->lnk = to_lnk;
rc = smc_switch_cursor(smc);
spin_unlock_bh(&conn->send_lock);
sock_put(&smc->sk);
if (rc) {
smcr_link_down_cond_sched(to_lnk);
return NULL;
}
goto again;
}
read_unlock_bh(&lgr->conns_lock);
return to_lnk;
}
static void smcr_buf_unuse(struct smc_buf_desc *rmb_desc, static void smcr_buf_unuse(struct smc_buf_desc *rmb_desc,
struct smc_link_group *lgr) struct smc_link_group *lgr)
{ {
...@@ -943,8 +1072,7 @@ static void smcr_link_down(struct smc_link *lnk) ...@@ -943,8 +1072,7 @@ static void smcr_link_down(struct smc_link *lnk)
return; return;
smc_ib_modify_qp_reset(lnk); smc_ib_modify_qp_reset(lnk);
to_lnk = NULL; to_lnk = smc_switch_conns(lgr, lnk, true);
/* tbd: call to_lnk = smc_switch_conns(lgr, lnk, true); */
if (!to_lnk) { /* no backup link available */ if (!to_lnk) { /* no backup link available */
smcr_link_clear(lnk); smcr_link_clear(lnk);
return; return;
......
...@@ -380,6 +380,8 @@ void smcr_link_clear(struct smc_link *lnk); ...@@ -380,6 +380,8 @@ void smcr_link_clear(struct smc_link *lnk);
int smcr_buf_map_lgr(struct smc_link *lnk); int smcr_buf_map_lgr(struct smc_link *lnk);
int smcr_buf_reg_lgr(struct smc_link *lnk); int smcr_buf_reg_lgr(struct smc_link *lnk);
int smcr_link_reg_rmb(struct smc_link *link, struct smc_buf_desc *rmb_desc); int smcr_link_reg_rmb(struct smc_link *link, struct smc_buf_desc *rmb_desc);
struct smc_link *smc_switch_conns(struct smc_link_group *lgr,
struct smc_link *from_lnk, bool is_dev_err);
void smcr_link_down_cond(struct smc_link *lnk); void smcr_link_down_cond(struct smc_link *lnk);
void smcr_link_down_cond_sched(struct smc_link *lnk); void smcr_link_down_cond_sched(struct smc_link *lnk);
......
...@@ -933,7 +933,7 @@ static void smc_llc_delete_asym_link(struct smc_link_group *lgr) ...@@ -933,7 +933,7 @@ static void smc_llc_delete_asym_link(struct smc_link_group *lgr)
return; /* no asymmetric link */ return; /* no asymmetric link */
if (!smc_link_downing(&lnk_asym->state)) if (!smc_link_downing(&lnk_asym->state))
return; return;
/* tbd: lnk_new = smc_switch_conns(lgr, lnk_asym, false); */ lnk_new = smc_switch_conns(lgr, lnk_asym, false);
smc_wr_tx_wait_no_pending_sends(lnk_asym); smc_wr_tx_wait_no_pending_sends(lnk_asym);
if (!lnk_new) if (!lnk_new)
goto out_free; goto out_free;
...@@ -1195,7 +1195,7 @@ static void smc_llc_process_cli_delete_link(struct smc_link_group *lgr) ...@@ -1195,7 +1195,7 @@ static void smc_llc_process_cli_delete_link(struct smc_link_group *lgr)
smc_llc_send_message(lnk, &qentry->msg); /* response */ smc_llc_send_message(lnk, &qentry->msg); /* response */
if (smc_link_downing(&lnk_del->state)) { if (smc_link_downing(&lnk_del->state)) {
/* tbd: call smc_switch_conns(lgr, lnk_del, false); */ smc_switch_conns(lgr, lnk_del, false);
smc_wr_tx_wait_no_pending_sends(lnk_del); smc_wr_tx_wait_no_pending_sends(lnk_del);
} }
smcr_link_clear(lnk_del); smcr_link_clear(lnk_del);
...@@ -1245,7 +1245,7 @@ static void smc_llc_process_srv_delete_link(struct smc_link_group *lgr) ...@@ -1245,7 +1245,7 @@ static void smc_llc_process_srv_delete_link(struct smc_link_group *lgr)
goto out; /* asymmetric link already deleted */ goto out; /* asymmetric link already deleted */
if (smc_link_downing(&lnk_del->state)) { if (smc_link_downing(&lnk_del->state)) {
/* tbd: call smc_switch_conns(lgr, lnk_del, false); */ smc_switch_conns(lgr, lnk_del, false);
smc_wr_tx_wait_no_pending_sends(lnk_del); smc_wr_tx_wait_no_pending_sends(lnk_del);
} }
if (!list_empty(&lgr->list)) { if (!list_empty(&lgr->list)) {
......
...@@ -482,12 +482,13 @@ static int smc_tx_rdma_writes(struct smc_connection *conn, ...@@ -482,12 +482,13 @@ static int smc_tx_rdma_writes(struct smc_connection *conn,
static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn) static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
{ {
struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags; struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
struct smc_link *link = conn->lnk;
struct smc_rdma_wr *wr_rdma_buf; struct smc_rdma_wr *wr_rdma_buf;
struct smc_cdc_tx_pend *pend; struct smc_cdc_tx_pend *pend;
struct smc_wr_buf *wr_buf; struct smc_wr_buf *wr_buf;
int rc; int rc;
rc = smc_cdc_get_free_slot(conn, &wr_buf, &wr_rdma_buf, &pend); rc = smc_cdc_get_free_slot(conn, link, &wr_buf, &wr_rdma_buf, &pend);
if (rc < 0) { if (rc < 0) {
if (rc == -EBUSY) { if (rc == -EBUSY) {
struct smc_sock *smc = struct smc_sock *smc =
...@@ -505,10 +506,17 @@ static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn) ...@@ -505,10 +506,17 @@ static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
} }
spin_lock_bh(&conn->send_lock); spin_lock_bh(&conn->send_lock);
if (link != conn->lnk) {
/* link of connection changed, tx_work will restart */
smc_wr_tx_put_slot(link,
(struct smc_wr_tx_pend_priv *)pend);
rc = -ENOLINK;
goto out_unlock;
}
if (!pflags->urg_data_present) { if (!pflags->urg_data_present) {
rc = smc_tx_rdma_writes(conn, wr_rdma_buf); rc = smc_tx_rdma_writes(conn, wr_rdma_buf);
if (rc) { if (rc) {
smc_wr_tx_put_slot(conn->lnk, smc_wr_tx_put_slot(link,
(struct smc_wr_tx_pend_priv *)pend); (struct smc_wr_tx_pend_priv *)pend);
goto out_unlock; goto out_unlock;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment