Commit 6c8968c4 authored by Karsten Graul's avatar Karsten Graul Committed by David S. Miller

net/smc: use worker to process incoming llc messages

Incoming llc messages are processed in irq tasklet context, and
a worker is used to send outgoing messages. The worker is needed
because getting a send buffer could result in a wait for a free buffer.

To make sure all incoming llc messages are processed in a serialized way
introduce an event queue and create a new queue entry for each message
which is queued to this event queue. A new worker processes the event
queue entries in order.
And remove the use of a separate worker to send outgoing llc messages
because the messages are processed in worker context already.
With this event queue the serialized llc_wq work queue is obsolete,
remove it.
Signed-off-by: default avatarKarsten Graul <kgraul@linux.ibm.com>
Reviewed-by: default avatarUrsula Braun <ubraun@linux.ibm.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 2140ac26
...@@ -412,7 +412,8 @@ static int smc_lgr_create(struct smc_sock *smc, struct smc_init_info *ini) ...@@ -412,7 +412,8 @@ static int smc_lgr_create(struct smc_sock *smc, struct smc_init_info *ini)
lgr->role = smc->listen_smc ? SMC_SERV : SMC_CLNT; lgr->role = smc->listen_smc ? SMC_SERV : SMC_CLNT;
memcpy(lgr->peer_systemid, ini->ib_lcl->id_for_peer, memcpy(lgr->peer_systemid, ini->ib_lcl->id_for_peer,
SMC_SYSTEMID_LEN); SMC_SYSTEMID_LEN);
INIT_LIST_HEAD(&lgr->llc_event_q);
spin_lock_init(&lgr->llc_event_q_lock);
link_idx = SMC_SINGLE_LINK; link_idx = SMC_SINGLE_LINK;
lnk = &lgr->lnk[link_idx]; lnk = &lgr->lnk[link_idx];
rc = smcr_link_init(lgr, lnk, link_idx, ini); rc = smcr_link_init(lgr, lnk, link_idx, ini);
...@@ -613,6 +614,7 @@ static void smc_lgr_free(struct smc_link_group *lgr) ...@@ -613,6 +614,7 @@ static void smc_lgr_free(struct smc_link_group *lgr)
if (lgr->lnk[i].state != SMC_LNK_UNUSED) if (lgr->lnk[i].state != SMC_LNK_UNUSED)
smcr_link_clear(&lgr->lnk[i]); smcr_link_clear(&lgr->lnk[i]);
} }
smc_llc_event_flush(lgr);
if (!atomic_dec_return(&lgr_cnt)) if (!atomic_dec_return(&lgr_cnt))
wake_up(&lgrs_deleted); wake_up(&lgrs_deleted);
} }
......
...@@ -120,7 +120,6 @@ struct smc_link { ...@@ -120,7 +120,6 @@ struct smc_link {
struct smc_link_group *lgr; /* parent link group */ struct smc_link_group *lgr; /* parent link group */
enum smc_link_state state; /* state of link */ enum smc_link_state state; /* state of link */
struct workqueue_struct *llc_wq; /* single thread work queue */
struct completion llc_confirm; /* wait for rx of conf link */ struct completion llc_confirm; /* wait for rx of conf link */
struct completion llc_confirm_resp; /* wait 4 rx of cnf lnk rsp */ struct completion llc_confirm_resp; /* wait 4 rx of cnf lnk rsp */
int llc_confirm_rc; /* rc from confirm link msg */ int llc_confirm_rc; /* rc from confirm link msg */
...@@ -233,6 +232,12 @@ struct smc_link_group { ...@@ -233,6 +232,12 @@ struct smc_link_group {
DECLARE_BITMAP(rtokens_used_mask, SMC_RMBS_PER_LGR_MAX); DECLARE_BITMAP(rtokens_used_mask, SMC_RMBS_PER_LGR_MAX);
/* used rtoken elements */ /* used rtoken elements */
u8 next_link_id; u8 next_link_id;
struct list_head llc_event_q;
/* queue for llc events */
spinlock_t llc_event_q_lock;
/* protects llc_event_q */
struct work_struct llc_event_work;
/* llc event worker */
}; };
struct { /* SMC-D */ struct { /* SMC-D */
u64 peer_gid; u64 peer_gid;
......
...@@ -134,6 +134,12 @@ union smc_llc_msg { ...@@ -134,6 +134,12 @@ union smc_llc_msg {
#define SMC_LLC_FLAG_RESP 0x80 #define SMC_LLC_FLAG_RESP 0x80
struct smc_llc_qentry {
struct list_head list;
struct smc_link *link;
union smc_llc_msg msg;
};
/********************************** send *************************************/ /********************************** send *************************************/
struct smc_llc_tx_pend { struct smc_llc_tx_pend {
...@@ -356,46 +362,20 @@ static int smc_llc_send_test_link(struct smc_link *link, u8 user_data[16]) ...@@ -356,46 +362,20 @@ static int smc_llc_send_test_link(struct smc_link *link, u8 user_data[16])
return rc; return rc;
} }
struct smc_llc_send_work { /* schedule an llc send on link, may wait for buffers */
struct work_struct work; static int smc_llc_send_message(struct smc_link *link, void *llcbuf)
struct smc_link *link;
int llclen;
union smc_llc_msg llcbuf;
};
/* worker that sends a prepared message */
static void smc_llc_send_message_work(struct work_struct *work)
{ {
struct smc_llc_send_work *llcwrk = container_of(work,
struct smc_llc_send_work, work);
struct smc_wr_tx_pend_priv *pend; struct smc_wr_tx_pend_priv *pend;
struct smc_wr_buf *wr_buf; struct smc_wr_buf *wr_buf;
int rc; int rc;
if (!smc_link_usable(llcwrk->link)) if (!smc_link_usable(link))
goto out; return -ENOLINK;
rc = smc_llc_add_pending_send(llcwrk->link, &wr_buf, &pend); rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc) if (rc)
goto out; return rc;
memcpy(wr_buf, &llcwrk->llcbuf, llcwrk->llclen); memcpy(wr_buf, llcbuf, sizeof(union smc_llc_msg));
smc_wr_tx_send(llcwrk->link, pend); return smc_wr_tx_send(link, pend);
out:
kfree(llcwrk);
}
/* copy llcbuf and schedule an llc send on link */
static int smc_llc_send_message(struct smc_link *link, void *llcbuf, int llclen)
{
struct smc_llc_send_work *wrk = kmalloc(sizeof(*wrk), GFP_ATOMIC);
if (!wrk)
return -ENOMEM;
INIT_WORK(&wrk->work, smc_llc_send_message_work);
wrk->link = link;
wrk->llclen = llclen;
memcpy(&wrk->llcbuf, llcbuf, llclen);
queue_work(link->llc_wq, &wrk->work);
return 0;
} }
/********************************* receive ***********************************/ /********************************* receive ***********************************/
...@@ -452,7 +432,7 @@ static void smc_llc_rx_add_link(struct smc_link *link, ...@@ -452,7 +432,7 @@ static void smc_llc_rx_add_link(struct smc_link *link,
link->smcibdev->mac[link->ibport - 1], link->smcibdev->mac[link->ibport - 1],
link->gid, SMC_LLC_RESP); link->gid, SMC_LLC_RESP);
} }
smc_llc_send_message(link, llc, sizeof(*llc)); smc_llc_send_message(link, llc);
} }
} }
...@@ -474,7 +454,7 @@ static void smc_llc_rx_delete_link(struct smc_link *link, ...@@ -474,7 +454,7 @@ static void smc_llc_rx_delete_link(struct smc_link *link,
/* server requests to delete this link, send response */ /* server requests to delete this link, send response */
smc_llc_prep_delete_link(llc, link, SMC_LLC_RESP, true); smc_llc_prep_delete_link(llc, link, SMC_LLC_RESP, true);
} }
smc_llc_send_message(link, llc, sizeof(*llc)); smc_llc_send_message(link, llc);
smc_lgr_terminate_sched(lgr); smc_lgr_terminate_sched(lgr);
} }
} }
...@@ -487,7 +467,7 @@ static void smc_llc_rx_test_link(struct smc_link *link, ...@@ -487,7 +467,7 @@ static void smc_llc_rx_test_link(struct smc_link *link,
complete(&link->llc_testlink_resp); complete(&link->llc_testlink_resp);
} else { } else {
llc->hd.flags |= SMC_LLC_FLAG_RESP; llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_send_message(link, llc, sizeof(*llc)); smc_llc_send_message(link, llc);
} }
} }
...@@ -510,7 +490,7 @@ static void smc_llc_rx_confirm_rkey(struct smc_link *link, ...@@ -510,7 +490,7 @@ static void smc_llc_rx_confirm_rkey(struct smc_link *link,
llc->hd.flags |= SMC_LLC_FLAG_RESP; llc->hd.flags |= SMC_LLC_FLAG_RESP;
if (rc < 0) if (rc < 0)
llc->hd.flags |= SMC_LLC_FLAG_RKEY_NEG; llc->hd.flags |= SMC_LLC_FLAG_RKEY_NEG;
smc_llc_send_message(link, llc, sizeof(*llc)); smc_llc_send_message(link, llc);
} }
} }
...@@ -522,7 +502,7 @@ static void smc_llc_rx_confirm_rkey_cont(struct smc_link *link, ...@@ -522,7 +502,7 @@ static void smc_llc_rx_confirm_rkey_cont(struct smc_link *link,
} else { } else {
/* ignore rtokens for other links, we have only one link */ /* ignore rtokens for other links, we have only one link */
llc->hd.flags |= SMC_LLC_FLAG_RESP; llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_send_message(link, llc, sizeof(*llc)); smc_llc_send_message(link, llc);
} }
} }
...@@ -549,21 +529,30 @@ static void smc_llc_rx_delete_rkey(struct smc_link *link, ...@@ -549,21 +529,30 @@ static void smc_llc_rx_delete_rkey(struct smc_link *link,
} }
llc->hd.flags |= SMC_LLC_FLAG_RESP; llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_send_message(link, llc, sizeof(*llc)); smc_llc_send_message(link, llc);
} }
} }
static void smc_llc_rx_handler(struct ib_wc *wc, void *buf) /* flush the llc event queue */
void smc_llc_event_flush(struct smc_link_group *lgr)
{ {
struct smc_link *link = (struct smc_link *)wc->qp->qp_context; struct smc_llc_qentry *qentry, *q;
union smc_llc_msg *llc = buf;
spin_lock_bh(&lgr->llc_event_q_lock);
list_for_each_entry_safe(qentry, q, &lgr->llc_event_q, list) {
list_del_init(&qentry->list);
kfree(qentry);
}
spin_unlock_bh(&lgr->llc_event_q_lock);
}
static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
{
union smc_llc_msg *llc = &qentry->msg;
struct smc_link *link = qentry->link;
if (wc->byte_len < sizeof(*llc))
return; /* short message */
if (llc->raw.hdr.length != sizeof(*llc))
return; /* invalid message */
if (!smc_link_usable(link)) if (!smc_link_usable(link))
return; /* link not active, drop msg */ goto out;
switch (llc->raw.hdr.common.type) { switch (llc->raw.hdr.common.type) {
case SMC_LLC_TEST_LINK: case SMC_LLC_TEST_LINK:
...@@ -588,6 +577,54 @@ static void smc_llc_rx_handler(struct ib_wc *wc, void *buf) ...@@ -588,6 +577,54 @@ static void smc_llc_rx_handler(struct ib_wc *wc, void *buf)
smc_llc_rx_delete_rkey(link, &llc->delete_rkey); smc_llc_rx_delete_rkey(link, &llc->delete_rkey);
break; break;
} }
out:
kfree(qentry);
}
/* worker to process llc messages on the event queue */
static void smc_llc_event_work(struct work_struct *work)
{
struct smc_link_group *lgr = container_of(work, struct smc_link_group,
llc_event_work);
struct smc_llc_qentry *qentry;
again:
spin_lock_bh(&lgr->llc_event_q_lock);
if (!list_empty(&lgr->llc_event_q)) {
qentry = list_first_entry(&lgr->llc_event_q,
struct smc_llc_qentry, list);
list_del_init(&qentry->list);
spin_unlock_bh(&lgr->llc_event_q_lock);
smc_llc_event_handler(qentry);
goto again;
}
spin_unlock_bh(&lgr->llc_event_q_lock);
}
/* copy received msg and add it to the event queue */
static void smc_llc_rx_handler(struct ib_wc *wc, void *buf)
{
struct smc_link *link = (struct smc_link *)wc->qp->qp_context;
struct smc_link_group *lgr = link->lgr;
struct smc_llc_qentry *qentry;
union smc_llc_msg *llc = buf;
unsigned long flags;
if (wc->byte_len < sizeof(*llc))
return; /* short message */
if (llc->raw.hdr.length != sizeof(*llc))
return; /* invalid message */
qentry = kmalloc(sizeof(*qentry), GFP_ATOMIC);
if (!qentry)
return;
qentry->link = link;
INIT_LIST_HEAD(&qentry->list);
memcpy(&qentry->msg, llc, sizeof(union smc_llc_msg));
spin_lock_irqsave(&lgr->llc_event_q_lock, flags);
list_add_tail(&qentry->list, &lgr->llc_event_q);
spin_unlock_irqrestore(&lgr->llc_event_q_lock, flags);
schedule_work(&link->lgr->llc_event_work);
} }
/***************************** worker, utils *********************************/ /***************************** worker, utils *********************************/
...@@ -626,12 +663,6 @@ static void smc_llc_testlink_work(struct work_struct *work) ...@@ -626,12 +663,6 @@ static void smc_llc_testlink_work(struct work_struct *work)
int smc_llc_link_init(struct smc_link *link) int smc_llc_link_init(struct smc_link *link)
{ {
struct smc_link_group *lgr = smc_get_lgr(link);
link->llc_wq = alloc_ordered_workqueue("llc_wq-%x:%x)", WQ_MEM_RECLAIM,
*((u32 *)lgr->id),
link->link_id);
if (!link->llc_wq)
return -ENOMEM;
init_completion(&link->llc_confirm); init_completion(&link->llc_confirm);
init_completion(&link->llc_confirm_resp); init_completion(&link->llc_confirm_resp);
init_completion(&link->llc_add); init_completion(&link->llc_add);
...@@ -640,6 +671,7 @@ int smc_llc_link_init(struct smc_link *link) ...@@ -640,6 +671,7 @@ int smc_llc_link_init(struct smc_link *link)
init_completion(&link->llc_delete_rkey); init_completion(&link->llc_delete_rkey);
mutex_init(&link->llc_delete_rkey_mutex); mutex_init(&link->llc_delete_rkey_mutex);
init_completion(&link->llc_testlink_resp); init_completion(&link->llc_testlink_resp);
INIT_WORK(&link->lgr->llc_event_work, smc_llc_event_work);
INIT_DELAYED_WORK(&link->llc_testlink_wrk, smc_llc_testlink_work); INIT_DELAYED_WORK(&link->llc_testlink_wrk, smc_llc_testlink_work);
return 0; return 0;
} }
...@@ -663,8 +695,6 @@ void smc_llc_link_deleting(struct smc_link *link) ...@@ -663,8 +695,6 @@ void smc_llc_link_deleting(struct smc_link *link)
/* called in worker context */ /* called in worker context */
void smc_llc_link_clear(struct smc_link *link) void smc_llc_link_clear(struct smc_link *link)
{ {
flush_workqueue(link->llc_wq);
destroy_workqueue(link->llc_wq);
complete(&link->llc_testlink_resp); complete(&link->llc_testlink_resp);
cancel_delayed_work_sync(&link->llc_testlink_wrk); cancel_delayed_work_sync(&link->llc_testlink_wrk);
smc_wr_wakeup_reg_wait(link); smc_wr_wakeup_reg_wait(link);
......
...@@ -61,6 +61,7 @@ int smc_llc_do_confirm_rkey(struct smc_link *link, ...@@ -61,6 +61,7 @@ int smc_llc_do_confirm_rkey(struct smc_link *link,
struct smc_buf_desc *rmb_desc); struct smc_buf_desc *rmb_desc);
int smc_llc_do_delete_rkey(struct smc_link *link, int smc_llc_do_delete_rkey(struct smc_link *link,
struct smc_buf_desc *rmb_desc); struct smc_buf_desc *rmb_desc);
void smc_llc_event_flush(struct smc_link_group *lgr);
int smc_llc_init(void) __init; int smc_llc_init(void) __init;
#endif /* SMC_LLC_H */ #endif /* SMC_LLC_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment