Commit da2e5ae5 authored by Allen Hubbe's avatar Allen Hubbe Committed by Jon Mason

NTB: Fix ntb_transport out-of-order RX update

It was possible for a synchronous update of the RX index in the error
case to get ahead of the asynchronous RX index update in the normal
case.  Change the RX processing to preserve an RX completion order.

There were two error cases.  First, if a buffer is not present to
receive data, there would be no queue entry to preserve the RX
completion order.  Instead of dropping the RX frame, leave the RX frame
in the ring.  Schedule RX processing when RX entries are enqueued, in
case there are RX frames waiting in the ring to be received.

Second, if a buffer is too small to receive data, drop the frame in the
ring, mark the RX entry as done, and indicate the error in the RX entry
length.  Check for a negative length in the receive callback in
ntb_netdev, and count occurrences as rx_length_errors.
Signed-off-by: default avatarAllen Hubbe <Allen.Hubbe@emc.com>
Signed-off-by: default avatarJon Mason <jdmason@kudzu.us>
parent f7644cbf
...@@ -102,6 +102,12 @@ static void ntb_netdev_rx_handler(struct ntb_transport_qp *qp, void *qp_data, ...@@ -102,6 +102,12 @@ static void ntb_netdev_rx_handler(struct ntb_transport_qp *qp, void *qp_data,
netdev_dbg(ndev, "%s: %d byte payload received\n", __func__, len); netdev_dbg(ndev, "%s: %d byte payload received\n", __func__, len);
if (len < 0) {
ndev->stats.rx_errors++;
ndev->stats.rx_length_errors++;
goto enqueue_again;
}
skb_put(skb, len); skb_put(skb, len);
skb->protocol = eth_type_trans(skb, ndev); skb->protocol = eth_type_trans(skb, ndev);
skb->ip_summed = CHECKSUM_NONE; skb->ip_summed = CHECKSUM_NONE;
...@@ -121,6 +127,7 @@ static void ntb_netdev_rx_handler(struct ntb_transport_qp *qp, void *qp_data, ...@@ -121,6 +127,7 @@ static void ntb_netdev_rx_handler(struct ntb_transport_qp *qp, void *qp_data,
return; return;
} }
enqueue_again:
rc = ntb_transport_rx_enqueue(qp, skb, skb->data, ndev->mtu + ETH_HLEN); rc = ntb_transport_rx_enqueue(qp, skb, skb->data, ndev->mtu + ETH_HLEN);
if (rc) { if (rc) {
dev_kfree_skb(skb); dev_kfree_skb(skb);
......
...@@ -142,10 +142,11 @@ struct ntb_transport_qp { ...@@ -142,10 +142,11 @@ struct ntb_transport_qp {
void (*rx_handler)(struct ntb_transport_qp *qp, void *qp_data, void (*rx_handler)(struct ntb_transport_qp *qp, void *qp_data,
void *data, int len); void *data, int len);
struct list_head rx_post_q;
struct list_head rx_pend_q; struct list_head rx_pend_q;
struct list_head rx_free_q; struct list_head rx_free_q;
spinlock_t ntb_rx_pend_q_lock; /* ntb_rx_q_lock: synchronize access to rx_XXXX_q */
spinlock_t ntb_rx_free_q_lock; spinlock_t ntb_rx_q_lock;
void *rx_buff; void *rx_buff;
unsigned int rx_index; unsigned int rx_index;
unsigned int rx_max_entry; unsigned int rx_max_entry;
...@@ -534,6 +535,27 @@ static struct ntb_queue_entry *ntb_list_rm(spinlock_t *lock, ...@@ -534,6 +535,27 @@ static struct ntb_queue_entry *ntb_list_rm(spinlock_t *lock,
return entry; return entry;
} }
static struct ntb_queue_entry *ntb_list_mv(spinlock_t *lock,
struct list_head *list,
struct list_head *to_list)
{
struct ntb_queue_entry *entry;
unsigned long flags;
spin_lock_irqsave(lock, flags);
if (list_empty(list)) {
entry = NULL;
} else {
entry = list_first_entry(list, struct ntb_queue_entry, entry);
list_move_tail(&entry->entry, to_list);
}
spin_unlock_irqrestore(lock, flags);
return entry;
}
static int ntb_transport_setup_qp_mw(struct ntb_transport_ctx *nt, static int ntb_transport_setup_qp_mw(struct ntb_transport_ctx *nt,
unsigned int qp_num) unsigned int qp_num)
{ {
...@@ -941,10 +963,10 @@ static int ntb_transport_init_queue(struct ntb_transport_ctx *nt, ...@@ -941,10 +963,10 @@ static int ntb_transport_init_queue(struct ntb_transport_ctx *nt,
INIT_DELAYED_WORK(&qp->link_work, ntb_qp_link_work); INIT_DELAYED_WORK(&qp->link_work, ntb_qp_link_work);
INIT_WORK(&qp->link_cleanup, ntb_qp_link_cleanup_work); INIT_WORK(&qp->link_cleanup, ntb_qp_link_cleanup_work);
spin_lock_init(&qp->ntb_rx_pend_q_lock); spin_lock_init(&qp->ntb_rx_q_lock);
spin_lock_init(&qp->ntb_rx_free_q_lock);
spin_lock_init(&qp->ntb_tx_free_q_lock); spin_lock_init(&qp->ntb_tx_free_q_lock);
INIT_LIST_HEAD(&qp->rx_post_q);
INIT_LIST_HEAD(&qp->rx_pend_q); INIT_LIST_HEAD(&qp->rx_pend_q);
INIT_LIST_HEAD(&qp->rx_free_q); INIT_LIST_HEAD(&qp->rx_free_q);
INIT_LIST_HEAD(&qp->tx_free_q); INIT_LIST_HEAD(&qp->tx_free_q);
...@@ -1107,22 +1129,47 @@ static void ntb_transport_free(struct ntb_client *self, struct ntb_dev *ndev) ...@@ -1107,22 +1129,47 @@ static void ntb_transport_free(struct ntb_client *self, struct ntb_dev *ndev)
kfree(nt); kfree(nt);
} }
static void ntb_rx_copy_callback(void *data) static void ntb_complete_rxc(struct ntb_transport_qp *qp)
{ {
struct ntb_queue_entry *entry = data; struct ntb_queue_entry *entry;
struct ntb_transport_qp *qp = entry->qp; void *cb_data;
void *cb_data = entry->cb_data; unsigned int len;
unsigned int len = entry->len; unsigned long irqflags;
struct ntb_payload_header *hdr = entry->rx_hdr;
spin_lock_irqsave(&qp->ntb_rx_q_lock, irqflags);
while (!list_empty(&qp->rx_post_q)) {
entry = list_first_entry(&qp->rx_post_q,
struct ntb_queue_entry, entry);
if (!(entry->flags & DESC_DONE_FLAG))
break;
entry->rx_hdr->flags = 0;
iowrite32(entry->index, &qp->rx_info->entry);
cb_data = entry->cb_data;
len = entry->len;
list_move_tail(&entry->entry, &qp->rx_free_q);
hdr->flags = 0; spin_unlock_irqrestore(&qp->ntb_rx_q_lock, irqflags);
iowrite32(entry->index, &qp->rx_info->entry); if (qp->rx_handler && qp->client_ready)
qp->rx_handler(qp, qp->cb_data, cb_data, len);
ntb_list_add(&qp->ntb_rx_free_q_lock, &entry->entry, &qp->rx_free_q); spin_lock_irqsave(&qp->ntb_rx_q_lock, irqflags);
}
if (qp->rx_handler && qp->client_ready) spin_unlock_irqrestore(&qp->ntb_rx_q_lock, irqflags);
qp->rx_handler(qp, qp->cb_data, cb_data, len); }
static void ntb_rx_copy_callback(void *data)
{
struct ntb_queue_entry *entry = data;
entry->flags |= DESC_DONE_FLAG;
ntb_complete_rxc(entry->qp);
} }
static void ntb_memcpy_rx(struct ntb_queue_entry *entry, void *offset) static void ntb_memcpy_rx(struct ntb_queue_entry *entry, void *offset)
...@@ -1138,19 +1185,18 @@ static void ntb_memcpy_rx(struct ntb_queue_entry *entry, void *offset) ...@@ -1138,19 +1185,18 @@ static void ntb_memcpy_rx(struct ntb_queue_entry *entry, void *offset)
ntb_rx_copy_callback(entry); ntb_rx_copy_callback(entry);
} }
static void ntb_async_rx(struct ntb_queue_entry *entry, void *offset, static void ntb_async_rx(struct ntb_queue_entry *entry, void *offset)
size_t len)
{ {
struct dma_async_tx_descriptor *txd; struct dma_async_tx_descriptor *txd;
struct ntb_transport_qp *qp = entry->qp; struct ntb_transport_qp *qp = entry->qp;
struct dma_chan *chan = qp->dma_chan; struct dma_chan *chan = qp->dma_chan;
struct dma_device *device; struct dma_device *device;
size_t pay_off, buff_off; size_t pay_off, buff_off, len;
struct dmaengine_unmap_data *unmap; struct dmaengine_unmap_data *unmap;
dma_cookie_t cookie; dma_cookie_t cookie;
void *buf = entry->buf; void *buf = entry->buf;
entry->len = len; len = entry->len;
if (!chan) if (!chan)
goto err; goto err;
...@@ -1226,7 +1272,6 @@ static int ntb_process_rxc(struct ntb_transport_qp *qp) ...@@ -1226,7 +1272,6 @@ static int ntb_process_rxc(struct ntb_transport_qp *qp)
struct ntb_payload_header *hdr; struct ntb_payload_header *hdr;
struct ntb_queue_entry *entry; struct ntb_queue_entry *entry;
void *offset; void *offset;
int rc;
offset = qp->rx_buff + qp->rx_max_frame * qp->rx_index; offset = qp->rx_buff + qp->rx_max_frame * qp->rx_index;
hdr = offset + qp->rx_max_frame - sizeof(struct ntb_payload_header); hdr = offset + qp->rx_max_frame - sizeof(struct ntb_payload_header);
...@@ -1255,65 +1300,43 @@ static int ntb_process_rxc(struct ntb_transport_qp *qp) ...@@ -1255,65 +1300,43 @@ static int ntb_process_rxc(struct ntb_transport_qp *qp)
return -EIO; return -EIO;
} }
entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q); entry = ntb_list_mv(&qp->ntb_rx_q_lock, &qp->rx_pend_q, &qp->rx_post_q);
if (!entry) { if (!entry) {
dev_dbg(&qp->ndev->pdev->dev, "no receive buffer\n"); dev_dbg(&qp->ndev->pdev->dev, "no receive buffer\n");
qp->rx_err_no_buf++; qp->rx_err_no_buf++;
return -EAGAIN;
rc = -ENOMEM;
goto err;
} }
entry->rx_hdr = hdr;
entry->index = qp->rx_index;
if (hdr->len > entry->len) { if (hdr->len > entry->len) {
dev_dbg(&qp->ndev->pdev->dev, dev_dbg(&qp->ndev->pdev->dev,
"receive buffer overflow! Wanted %d got %d\n", "receive buffer overflow! Wanted %d got %d\n",
hdr->len, entry->len); hdr->len, entry->len);
qp->rx_err_oflow++; qp->rx_err_oflow++;
rc = -EIO; entry->len = -EIO;
goto err; entry->flags |= DESC_DONE_FLAG;
}
dev_dbg(&qp->ndev->pdev->dev, ntb_complete_rxc(qp);
"RX OK index %u ver %u size %d into buf size %d\n", } else {
qp->rx_index, hdr->ver, hdr->len, entry->len); dev_dbg(&qp->ndev->pdev->dev,
"RX OK index %u ver %u size %d into buf size %d\n",
qp->rx_index, hdr->ver, hdr->len, entry->len);
qp->rx_bytes += hdr->len; qp->rx_bytes += hdr->len;
qp->rx_pkts++; qp->rx_pkts++;
entry->index = qp->rx_index; entry->len = hdr->len;
entry->rx_hdr = hdr;
ntb_async_rx(entry, offset, hdr->len); ntb_async_rx(entry, offset);
}
qp->rx_index++; qp->rx_index++;
qp->rx_index %= qp->rx_max_entry; qp->rx_index %= qp->rx_max_entry;
return 0; return 0;
err:
/* FIXME: if this syncrhonous update of the rx_index gets ahead of
* asyncrhonous ntb_rx_copy_callback of previous entry, there are three
* scenarios:
*
* 1) The peer might miss this update, but observe the update
* from the memcpy completion callback. In this case, the buffer will
* not be freed on the peer to be reused for a different packet. The
* successful rx of a later packet would clear the condition, but the
* condition could persist if several rx fail in a row.
*
* 2) The peer may observe this update before the asyncrhonous copy of
* prior packets is completed. The peer may overwrite the buffers of
* the prior packets before they are copied.
*
* 3) Both: the peer may observe the update, and then observe the index
* decrement by the asynchronous completion callback. Who knows what
* badness that will cause.
*/
hdr->flags = 0;
iowrite32(qp->rx_index, &qp->rx_info->entry);
return rc;
} }
static void ntb_transport_rxc_db(unsigned long data) static void ntb_transport_rxc_db(unsigned long data)
...@@ -1333,7 +1356,7 @@ static void ntb_transport_rxc_db(unsigned long data) ...@@ -1333,7 +1356,7 @@ static void ntb_transport_rxc_db(unsigned long data)
break; break;
} }
if (qp->dma_chan) if (i && qp->dma_chan)
dma_async_issue_pending(qp->dma_chan); dma_async_issue_pending(qp->dma_chan);
if (i == qp->rx_max_entry) { if (i == qp->rx_max_entry) {
...@@ -1609,7 +1632,7 @@ ntb_transport_create_queue(void *data, struct device *client_dev, ...@@ -1609,7 +1632,7 @@ ntb_transport_create_queue(void *data, struct device *client_dev,
goto err1; goto err1;
entry->qp = qp; entry->qp = qp;
ntb_list_add(&qp->ntb_rx_free_q_lock, &entry->entry, ntb_list_add(&qp->ntb_rx_q_lock, &entry->entry,
&qp->rx_free_q); &qp->rx_free_q);
} }
...@@ -1634,7 +1657,7 @@ ntb_transport_create_queue(void *data, struct device *client_dev, ...@@ -1634,7 +1657,7 @@ ntb_transport_create_queue(void *data, struct device *client_dev,
while ((entry = ntb_list_rm(&qp->ntb_tx_free_q_lock, &qp->tx_free_q))) while ((entry = ntb_list_rm(&qp->ntb_tx_free_q_lock, &qp->tx_free_q)))
kfree(entry); kfree(entry);
err1: err1:
while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q))) while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_free_q)))
kfree(entry); kfree(entry);
if (qp->dma_chan) if (qp->dma_chan)
dma_release_channel(qp->dma_chan); dma_release_channel(qp->dma_chan);
...@@ -1689,11 +1712,16 @@ void ntb_transport_free_queue(struct ntb_transport_qp *qp) ...@@ -1689,11 +1712,16 @@ void ntb_transport_free_queue(struct ntb_transport_qp *qp)
qp->tx_handler = NULL; qp->tx_handler = NULL;
qp->event_handler = NULL; qp->event_handler = NULL;
while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q))) while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_free_q)))
kfree(entry); kfree(entry);
while ((entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q))) { while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_pend_q))) {
dev_warn(&pdev->dev, "Freeing item from a non-empty queue\n"); dev_warn(&pdev->dev, "Freeing item from non-empty rx_pend_q\n");
kfree(entry);
}
while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_post_q))) {
dev_warn(&pdev->dev, "Freeing item from non-empty rx_post_q\n");
kfree(entry); kfree(entry);
} }
...@@ -1724,14 +1752,14 @@ void *ntb_transport_rx_remove(struct ntb_transport_qp *qp, unsigned int *len) ...@@ -1724,14 +1752,14 @@ void *ntb_transport_rx_remove(struct ntb_transport_qp *qp, unsigned int *len)
if (!qp || qp->client_ready) if (!qp || qp->client_ready)
return NULL; return NULL;
entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q); entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_pend_q);
if (!entry) if (!entry)
return NULL; return NULL;
buf = entry->cb_data; buf = entry->cb_data;
*len = entry->len; *len = entry->len;
ntb_list_add(&qp->ntb_rx_free_q_lock, &entry->entry, &qp->rx_free_q); ntb_list_add(&qp->ntb_rx_q_lock, &entry->entry, &qp->rx_free_q);
return buf; return buf;
} }
...@@ -1757,15 +1785,18 @@ int ntb_transport_rx_enqueue(struct ntb_transport_qp *qp, void *cb, void *data, ...@@ -1757,15 +1785,18 @@ int ntb_transport_rx_enqueue(struct ntb_transport_qp *qp, void *cb, void *data,
if (!qp) if (!qp)
return -EINVAL; return -EINVAL;
entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q); entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_free_q);
if (!entry) if (!entry)
return -ENOMEM; return -ENOMEM;
entry->cb_data = cb; entry->cb_data = cb;
entry->buf = data; entry->buf = data;
entry->len = len; entry->len = len;
entry->flags = 0;
ntb_list_add(&qp->ntb_rx_q_lock, &entry->entry, &qp->rx_pend_q);
ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry, &qp->rx_pend_q); tasklet_schedule(&qp->rxc_db_work);
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment