Commit 4cc40af0 authored by David S. Miller's avatar David S. Miller

Merge branch 'xen-netback'

David Vrabel says:

====================
xen-netback: guest Rx queue drain and stall fixes

This series fixes two critical xen-netback bugs.

1. Netback may consume all of host memory by queuing an unlimited
   number of skb on the internal guest Rx queue.  This behaviour is
   guest triggerable.

2. Carrier flapping under high traffic rates which reduces
   performance.

The first patch is a prerequite.  Removing support for frontends with
feature-rx-notify makes it easier to reason about the correctness of
netback since it no longer has to support this outdated and broken
mode.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 5345c1d4 ecf08d2d
...@@ -176,10 +176,11 @@ struct xenvif_queue { /* Per-queue data for xenvif */ ...@@ -176,10 +176,11 @@ struct xenvif_queue { /* Per-queue data for xenvif */
char rx_irq_name[IRQ_NAME_SIZE]; /* DEVNAME-qN-rx */ char rx_irq_name[IRQ_NAME_SIZE]; /* DEVNAME-qN-rx */
struct xen_netif_rx_back_ring rx; struct xen_netif_rx_back_ring rx;
struct sk_buff_head rx_queue; struct sk_buff_head rx_queue;
RING_IDX rx_last_skb_slots;
unsigned long status;
struct timer_list rx_stalled; unsigned int rx_queue_max;
unsigned int rx_queue_len;
unsigned long last_rx_time;
bool stalled;
struct gnttab_copy grant_copy_op[MAX_GRANT_COPY_OPS]; struct gnttab_copy grant_copy_op[MAX_GRANT_COPY_OPS];
...@@ -199,18 +200,14 @@ struct xenvif_queue { /* Per-queue data for xenvif */ ...@@ -199,18 +200,14 @@ struct xenvif_queue { /* Per-queue data for xenvif */
struct xenvif_stats stats; struct xenvif_stats stats;
}; };
/* Maximum number of Rx slots a to-guest packet may use, including the
* slot needed for GSO meta-data.
*/
#define XEN_NETBK_RX_SLOTS_MAX (MAX_SKB_FRAGS + 1)
enum state_bit_shift { enum state_bit_shift {
/* This bit marks that the vif is connected */ /* This bit marks that the vif is connected */
VIF_STATUS_CONNECTED, VIF_STATUS_CONNECTED,
/* This bit signals the RX thread that queuing was stopped (in
* start_xmit), and either the timer fired or an RX interrupt came
*/
QUEUE_STATUS_RX_PURGE_EVENT,
/* This bit tells the interrupt handler that this queue was the reason
* for the carrier off, so it should kick the thread. Only queues which
* brought it down can turn on the carrier.
*/
QUEUE_STATUS_RX_STALLED
}; };
struct xenvif { struct xenvif {
...@@ -228,9 +225,6 @@ struct xenvif { ...@@ -228,9 +225,6 @@ struct xenvif {
u8 ip_csum:1; u8 ip_csum:1;
u8 ipv6_csum:1; u8 ipv6_csum:1;
/* Internal feature information. */
u8 can_queue:1; /* can queue packets for receiver? */
/* Is this interface disabled? True when backend discovers /* Is this interface disabled? True when backend discovers
* frontend is rogue. * frontend is rogue.
*/ */
...@@ -240,6 +234,9 @@ struct xenvif { ...@@ -240,6 +234,9 @@ struct xenvif {
/* Queues */ /* Queues */
struct xenvif_queue *queues; struct xenvif_queue *queues;
unsigned int num_queues; /* active queues, resource allocated */ unsigned int num_queues; /* active queues, resource allocated */
unsigned int stalled_queues;
spinlock_t lock;
#ifdef CONFIG_DEBUG_FS #ifdef CONFIG_DEBUG_FS
struct dentry *xenvif_dbg_root; struct dentry *xenvif_dbg_root;
...@@ -249,6 +246,14 @@ struct xenvif { ...@@ -249,6 +246,14 @@ struct xenvif {
struct net_device *dev; struct net_device *dev;
}; };
struct xenvif_rx_cb {
unsigned long expires;
int meta_slots_used;
bool full_coalesce;
};
#define XENVIF_RX_CB(skb) ((struct xenvif_rx_cb *)(skb)->cb)
static inline struct xenbus_device *xenvif_to_xenbus_device(struct xenvif *vif) static inline struct xenbus_device *xenvif_to_xenbus_device(struct xenvif *vif)
{ {
return to_xenbus_device(vif->dev->dev.parent); return to_xenbus_device(vif->dev->dev.parent);
...@@ -272,8 +277,6 @@ void xenvif_xenbus_fini(void); ...@@ -272,8 +277,6 @@ void xenvif_xenbus_fini(void);
int xenvif_schedulable(struct xenvif *vif); int xenvif_schedulable(struct xenvif *vif);
int xenvif_must_stop_queue(struct xenvif_queue *queue);
int xenvif_queue_stopped(struct xenvif_queue *queue); int xenvif_queue_stopped(struct xenvif_queue *queue);
void xenvif_wake_queue(struct xenvif_queue *queue); void xenvif_wake_queue(struct xenvif_queue *queue);
...@@ -296,6 +299,8 @@ void xenvif_kick_thread(struct xenvif_queue *queue); ...@@ -296,6 +299,8 @@ void xenvif_kick_thread(struct xenvif_queue *queue);
int xenvif_dealloc_kthread(void *data); int xenvif_dealloc_kthread(void *data);
void xenvif_rx_queue_tail(struct xenvif_queue *queue, struct sk_buff *skb);
/* Determine whether the needed number of slots (req) are available, /* Determine whether the needed number of slots (req) are available,
* and set req_event if not. * and set req_event if not.
*/ */
......
...@@ -43,6 +43,9 @@ ...@@ -43,6 +43,9 @@
#define XENVIF_QUEUE_LENGTH 32 #define XENVIF_QUEUE_LENGTH 32
#define XENVIF_NAPI_WEIGHT 64 #define XENVIF_NAPI_WEIGHT 64
/* Number of bytes allowed on the internal guest Rx queue. */
#define XENVIF_RX_QUEUE_BYTES (XEN_NETIF_RX_RING_SIZE/2 * PAGE_SIZE)
/* This function is used to set SKBTX_DEV_ZEROCOPY as well as /* This function is used to set SKBTX_DEV_ZEROCOPY as well as
* increasing the inflight counter. We need to increase the inflight * increasing the inflight counter. We need to increase the inflight
* counter because core driver calls into xenvif_zerocopy_callback * counter because core driver calls into xenvif_zerocopy_callback
...@@ -60,20 +63,11 @@ void xenvif_skb_zerocopy_complete(struct xenvif_queue *queue) ...@@ -60,20 +63,11 @@ void xenvif_skb_zerocopy_complete(struct xenvif_queue *queue)
atomic_dec(&queue->inflight_packets); atomic_dec(&queue->inflight_packets);
} }
static inline void xenvif_stop_queue(struct xenvif_queue *queue)
{
struct net_device *dev = queue->vif->dev;
if (!queue->vif->can_queue)
return;
netif_tx_stop_queue(netdev_get_tx_queue(dev, queue->id));
}
int xenvif_schedulable(struct xenvif *vif) int xenvif_schedulable(struct xenvif *vif)
{ {
return netif_running(vif->dev) && return netif_running(vif->dev) &&
test_bit(VIF_STATUS_CONNECTED, &vif->status); test_bit(VIF_STATUS_CONNECTED, &vif->status) &&
!vif->disabled;
} }
static irqreturn_t xenvif_tx_interrupt(int irq, void *dev_id) static irqreturn_t xenvif_tx_interrupt(int irq, void *dev_id)
...@@ -114,16 +108,7 @@ int xenvif_poll(struct napi_struct *napi, int budget) ...@@ -114,16 +108,7 @@ int xenvif_poll(struct napi_struct *napi, int budget)
static irqreturn_t xenvif_rx_interrupt(int irq, void *dev_id) static irqreturn_t xenvif_rx_interrupt(int irq, void *dev_id)
{ {
struct xenvif_queue *queue = dev_id; struct xenvif_queue *queue = dev_id;
struct netdev_queue *net_queue =
netdev_get_tx_queue(queue->vif->dev, queue->id);
/* QUEUE_STATUS_RX_PURGE_EVENT is only set if either QDisc was off OR
* the carrier went down and this queue was previously blocked
*/
if (unlikely(netif_tx_queue_stopped(net_queue) ||
(!netif_carrier_ok(queue->vif->dev) &&
test_bit(QUEUE_STATUS_RX_STALLED, &queue->status))))
set_bit(QUEUE_STATUS_RX_PURGE_EVENT, &queue->status);
xenvif_kick_thread(queue); xenvif_kick_thread(queue);
return IRQ_HANDLED; return IRQ_HANDLED;
...@@ -151,24 +136,13 @@ void xenvif_wake_queue(struct xenvif_queue *queue) ...@@ -151,24 +136,13 @@ void xenvif_wake_queue(struct xenvif_queue *queue)
netif_tx_wake_queue(netdev_get_tx_queue(dev, id)); netif_tx_wake_queue(netdev_get_tx_queue(dev, id));
} }
/* Callback to wake the queue's thread and turn the carrier off on timeout */
static void xenvif_rx_stalled(unsigned long data)
{
struct xenvif_queue *queue = (struct xenvif_queue *)data;
if (xenvif_queue_stopped(queue)) {
set_bit(QUEUE_STATUS_RX_PURGE_EVENT, &queue->status);
xenvif_kick_thread(queue);
}
}
static int xenvif_start_xmit(struct sk_buff *skb, struct net_device *dev) static int xenvif_start_xmit(struct sk_buff *skb, struct net_device *dev)
{ {
struct xenvif *vif = netdev_priv(dev); struct xenvif *vif = netdev_priv(dev);
struct xenvif_queue *queue = NULL; struct xenvif_queue *queue = NULL;
unsigned int num_queues = vif->num_queues; unsigned int num_queues = vif->num_queues;
u16 index; u16 index;
int min_slots_needed; struct xenvif_rx_cb *cb;
BUG_ON(skb->dev != dev); BUG_ON(skb->dev != dev);
...@@ -191,30 +165,10 @@ static int xenvif_start_xmit(struct sk_buff *skb, struct net_device *dev) ...@@ -191,30 +165,10 @@ static int xenvif_start_xmit(struct sk_buff *skb, struct net_device *dev)
!xenvif_schedulable(vif)) !xenvif_schedulable(vif))
goto drop; goto drop;
/* At best we'll need one slot for the header and one for each cb = XENVIF_RX_CB(skb);
* frag. cb->expires = jiffies + rx_drain_timeout_jiffies;
*/
min_slots_needed = 1 + skb_shinfo(skb)->nr_frags;
/* If the skb is GSO then we'll also need an extra slot for the
* metadata.
*/
if (skb_is_gso(skb))
min_slots_needed++;
/* If the skb can't possibly fit in the remaining slots
* then turn off the queue to give the ring a chance to
* drain.
*/
if (!xenvif_rx_ring_slots_available(queue, min_slots_needed)) {
queue->rx_stalled.function = xenvif_rx_stalled;
queue->rx_stalled.data = (unsigned long)queue;
xenvif_stop_queue(queue);
mod_timer(&queue->rx_stalled,
jiffies + rx_drain_timeout_jiffies);
}
skb_queue_tail(&queue->rx_queue, skb); xenvif_rx_queue_tail(queue, skb);
xenvif_kick_thread(queue); xenvif_kick_thread(queue);
return NETDEV_TX_OK; return NETDEV_TX_OK;
...@@ -465,6 +419,8 @@ struct xenvif *xenvif_alloc(struct device *parent, domid_t domid, ...@@ -465,6 +419,8 @@ struct xenvif *xenvif_alloc(struct device *parent, domid_t domid,
vif->queues = NULL; vif->queues = NULL;
vif->num_queues = 0; vif->num_queues = 0;
spin_lock_init(&vif->lock);
dev->netdev_ops = &xenvif_netdev_ops; dev->netdev_ops = &xenvif_netdev_ops;
dev->hw_features = NETIF_F_SG | dev->hw_features = NETIF_F_SG |
NETIF_F_IP_CSUM | NETIF_F_IPV6_CSUM | NETIF_F_IP_CSUM | NETIF_F_IPV6_CSUM |
...@@ -508,6 +464,8 @@ int xenvif_init_queue(struct xenvif_queue *queue) ...@@ -508,6 +464,8 @@ int xenvif_init_queue(struct xenvif_queue *queue)
init_timer(&queue->credit_timeout); init_timer(&queue->credit_timeout);
queue->credit_window_start = get_jiffies_64(); queue->credit_window_start = get_jiffies_64();
queue->rx_queue_max = XENVIF_RX_QUEUE_BYTES;
skb_queue_head_init(&queue->rx_queue); skb_queue_head_init(&queue->rx_queue);
skb_queue_head_init(&queue->tx_queue); skb_queue_head_init(&queue->tx_queue);
...@@ -539,8 +497,6 @@ int xenvif_init_queue(struct xenvif_queue *queue) ...@@ -539,8 +497,6 @@ int xenvif_init_queue(struct xenvif_queue *queue)
queue->grant_tx_handle[i] = NETBACK_INVALID_HANDLE; queue->grant_tx_handle[i] = NETBACK_INVALID_HANDLE;
} }
init_timer(&queue->rx_stalled);
return 0; return 0;
} }
...@@ -551,7 +507,6 @@ void xenvif_carrier_on(struct xenvif *vif) ...@@ -551,7 +507,6 @@ void xenvif_carrier_on(struct xenvif *vif)
dev_set_mtu(vif->dev, ETH_DATA_LEN); dev_set_mtu(vif->dev, ETH_DATA_LEN);
netdev_update_features(vif->dev); netdev_update_features(vif->dev);
set_bit(VIF_STATUS_CONNECTED, &vif->status); set_bit(VIF_STATUS_CONNECTED, &vif->status);
netif_carrier_on(vif->dev);
if (netif_running(vif->dev)) if (netif_running(vif->dev))
xenvif_up(vif); xenvif_up(vif);
rtnl_unlock(); rtnl_unlock();
...@@ -611,6 +566,8 @@ int xenvif_connect(struct xenvif_queue *queue, unsigned long tx_ring_ref, ...@@ -611,6 +566,8 @@ int xenvif_connect(struct xenvif_queue *queue, unsigned long tx_ring_ref,
disable_irq(queue->rx_irq); disable_irq(queue->rx_irq);
} }
queue->stalled = true;
task = kthread_create(xenvif_kthread_guest_rx, task = kthread_create(xenvif_kthread_guest_rx,
(void *)queue, "%s-guest-rx", queue->name); (void *)queue, "%s-guest-rx", queue->name);
if (IS_ERR(task)) { if (IS_ERR(task)) {
...@@ -674,7 +631,6 @@ void xenvif_disconnect(struct xenvif *vif) ...@@ -674,7 +631,6 @@ void xenvif_disconnect(struct xenvif *vif)
netif_napi_del(&queue->napi); netif_napi_del(&queue->napi);
if (queue->task) { if (queue->task) {
del_timer_sync(&queue->rx_stalled);
kthread_stop(queue->task); kthread_stop(queue->task);
queue->task = NULL; queue->task = NULL;
} }
......
...@@ -55,13 +55,20 @@ ...@@ -55,13 +55,20 @@
bool separate_tx_rx_irq = 1; bool separate_tx_rx_irq = 1;
module_param(separate_tx_rx_irq, bool, 0644); module_param(separate_tx_rx_irq, bool, 0644);
/* When guest ring is filled up, qdisc queues the packets for us, but we have /* The time that packets can stay on the guest Rx internal queue
* to timeout them, otherwise other guests' packets can get stuck there * before they are dropped.
*/ */
unsigned int rx_drain_timeout_msecs = 10000; unsigned int rx_drain_timeout_msecs = 10000;
module_param(rx_drain_timeout_msecs, uint, 0444); module_param(rx_drain_timeout_msecs, uint, 0444);
unsigned int rx_drain_timeout_jiffies; unsigned int rx_drain_timeout_jiffies;
/* The length of time before the frontend is considered unresponsive
* because it isn't providing Rx slots.
*/
static unsigned int rx_stall_timeout_msecs = 60000;
module_param(rx_stall_timeout_msecs, uint, 0444);
static unsigned int rx_stall_timeout_jiffies;
unsigned int xenvif_max_queues; unsigned int xenvif_max_queues;
module_param_named(max_queues, xenvif_max_queues, uint, 0644); module_param_named(max_queues, xenvif_max_queues, uint, 0644);
MODULE_PARM_DESC(max_queues, MODULE_PARM_DESC(max_queues,
...@@ -83,7 +90,6 @@ static void make_tx_response(struct xenvif_queue *queue, ...@@ -83,7 +90,6 @@ static void make_tx_response(struct xenvif_queue *queue,
s8 st); s8 st);
static inline int tx_work_todo(struct xenvif_queue *queue); static inline int tx_work_todo(struct xenvif_queue *queue);
static inline int rx_work_todo(struct xenvif_queue *queue);
static struct xen_netif_rx_response *make_rx_response(struct xenvif_queue *queue, static struct xen_netif_rx_response *make_rx_response(struct xenvif_queue *queue,
u16 id, u16 id,
...@@ -163,6 +169,69 @@ bool xenvif_rx_ring_slots_available(struct xenvif_queue *queue, int needed) ...@@ -163,6 +169,69 @@ bool xenvif_rx_ring_slots_available(struct xenvif_queue *queue, int needed)
return false; return false;
} }
void xenvif_rx_queue_tail(struct xenvif_queue *queue, struct sk_buff *skb)
{
unsigned long flags;
spin_lock_irqsave(&queue->rx_queue.lock, flags);
__skb_queue_tail(&queue->rx_queue, skb);
queue->rx_queue_len += skb->len;
if (queue->rx_queue_len > queue->rx_queue_max)
netif_tx_stop_queue(netdev_get_tx_queue(queue->vif->dev, queue->id));
spin_unlock_irqrestore(&queue->rx_queue.lock, flags);
}
static struct sk_buff *xenvif_rx_dequeue(struct xenvif_queue *queue)
{
struct sk_buff *skb;
spin_lock_irq(&queue->rx_queue.lock);
skb = __skb_dequeue(&queue->rx_queue);
if (skb)
queue->rx_queue_len -= skb->len;
spin_unlock_irq(&queue->rx_queue.lock);
return skb;
}
static void xenvif_rx_queue_maybe_wake(struct xenvif_queue *queue)
{
spin_lock_irq(&queue->rx_queue.lock);
if (queue->rx_queue_len < queue->rx_queue_max)
netif_tx_wake_queue(netdev_get_tx_queue(queue->vif->dev, queue->id));
spin_unlock_irq(&queue->rx_queue.lock);
}
static void xenvif_rx_queue_purge(struct xenvif_queue *queue)
{
struct sk_buff *skb;
while ((skb = xenvif_rx_dequeue(queue)) != NULL)
kfree_skb(skb);
}
static void xenvif_rx_queue_drop_expired(struct xenvif_queue *queue)
{
struct sk_buff *skb;
for(;;) {
skb = skb_peek(&queue->rx_queue);
if (!skb)
break;
if (time_before(jiffies, XENVIF_RX_CB(skb)->expires))
break;
xenvif_rx_dequeue(queue);
kfree_skb(skb);
}
}
/* /*
* Returns true if we should start a new receive buffer instead of * Returns true if we should start a new receive buffer instead of
* adding 'size' bytes to a buffer which currently contains 'offset' * adding 'size' bytes to a buffer which currently contains 'offset'
...@@ -237,13 +306,6 @@ static struct xenvif_rx_meta *get_next_rx_buffer(struct xenvif_queue *queue, ...@@ -237,13 +306,6 @@ static struct xenvif_rx_meta *get_next_rx_buffer(struct xenvif_queue *queue,
return meta; return meta;
} }
struct xenvif_rx_cb {
int meta_slots_used;
bool full_coalesce;
};
#define XENVIF_RX_CB(skb) ((struct xenvif_rx_cb *)(skb)->cb)
/* /*
* Set up the grant operations for this fragment. If it's a flipping * Set up the grant operations for this fragment. If it's a flipping
* interface, we also set up the unmap request from here. * interface, we also set up the unmap request from here.
...@@ -587,12 +649,15 @@ static void xenvif_rx_action(struct xenvif_queue *queue) ...@@ -587,12 +649,15 @@ static void xenvif_rx_action(struct xenvif_queue *queue)
skb_queue_head_init(&rxq); skb_queue_head_init(&rxq);
while ((skb = skb_dequeue(&queue->rx_queue)) != NULL) { while (xenvif_rx_ring_slots_available(queue, XEN_NETBK_RX_SLOTS_MAX)
&& (skb = xenvif_rx_dequeue(queue)) != NULL) {
RING_IDX max_slots_needed; RING_IDX max_slots_needed;
RING_IDX old_req_cons; RING_IDX old_req_cons;
RING_IDX ring_slots_used; RING_IDX ring_slots_used;
int i; int i;
queue->last_rx_time = jiffies;
/* We need a cheap worse case estimate for the number of /* We need a cheap worse case estimate for the number of
* slots we'll use. * slots we'll use.
*/ */
...@@ -634,15 +699,6 @@ static void xenvif_rx_action(struct xenvif_queue *queue) ...@@ -634,15 +699,6 @@ static void xenvif_rx_action(struct xenvif_queue *queue)
skb_shinfo(skb)->gso_type & SKB_GSO_TCPV6)) skb_shinfo(skb)->gso_type & SKB_GSO_TCPV6))
max_slots_needed++; max_slots_needed++;
/* If the skb may not fit then bail out now */
if (!xenvif_rx_ring_slots_available(queue, max_slots_needed)) {
skb_queue_head(&queue->rx_queue, skb);
need_to_notify = true;
queue->rx_last_skb_slots = max_slots_needed;
break;
} else
queue->rx_last_skb_slots = 0;
old_req_cons = queue->rx.req_cons; old_req_cons = queue->rx.req_cons;
XENVIF_RX_CB(skb)->meta_slots_used = xenvif_gop_skb(skb, &npo, queue); XENVIF_RX_CB(skb)->meta_slots_used = xenvif_gop_skb(skb, &npo, queue);
ring_slots_used = queue->rx.req_cons - old_req_cons; ring_slots_used = queue->rx.req_cons - old_req_cons;
...@@ -1869,12 +1925,6 @@ void xenvif_idx_unmap(struct xenvif_queue *queue, u16 pending_idx) ...@@ -1869,12 +1925,6 @@ void xenvif_idx_unmap(struct xenvif_queue *queue, u16 pending_idx)
} }
} }
static inline int rx_work_todo(struct xenvif_queue *queue)
{
return (!skb_queue_empty(&queue->rx_queue) &&
xenvif_rx_ring_slots_available(queue, queue->rx_last_skb_slots));
}
static inline int tx_work_todo(struct xenvif_queue *queue) static inline int tx_work_todo(struct xenvif_queue *queue)
{ {
if (likely(RING_HAS_UNCONSUMED_REQUESTS(&queue->tx))) if (likely(RING_HAS_UNCONSUMED_REQUESTS(&queue->tx)))
...@@ -1931,92 +1981,121 @@ int xenvif_map_frontend_rings(struct xenvif_queue *queue, ...@@ -1931,92 +1981,121 @@ int xenvif_map_frontend_rings(struct xenvif_queue *queue,
return err; return err;
} }
static void xenvif_start_queue(struct xenvif_queue *queue) static void xenvif_queue_carrier_off(struct xenvif_queue *queue)
{ {
if (xenvif_schedulable(queue->vif)) struct xenvif *vif = queue->vif;
xenvif_wake_queue(queue);
queue->stalled = true;
/* At least one queue has stalled? Disable the carrier. */
spin_lock(&vif->lock);
if (vif->stalled_queues++ == 0) {
netdev_info(vif->dev, "Guest Rx stalled");
netif_carrier_off(vif->dev);
}
spin_unlock(&vif->lock);
} }
/* Only called from the queue's thread, it handles the situation when the guest static void xenvif_queue_carrier_on(struct xenvif_queue *queue)
* doesn't post enough requests on the receiving ring.
* First xenvif_start_xmit disables QDisc and start a timer, and then either the
* timer fires, or the guest send an interrupt after posting new request. If it
* is the timer, the carrier is turned off here.
* */
static void xenvif_rx_purge_event(struct xenvif_queue *queue)
{ {
/* Either the last unsuccesful skb or at least 1 slot should fit */ struct xenvif *vif = queue->vif;
int needed = queue->rx_last_skb_slots ?
queue->rx_last_skb_slots : 1;
/* It is assumed that if the guest post new slots after this, the RX
* interrupt will set the QUEUE_STATUS_RX_PURGE_EVENT bit and wake up
* the thread again
*/
set_bit(QUEUE_STATUS_RX_STALLED, &queue->status);
if (!xenvif_rx_ring_slots_available(queue, needed)) {
rtnl_lock();
if (netif_carrier_ok(queue->vif->dev)) {
/* Timer fired and there are still no slots. Turn off
* everything except the interrupts
*/
netif_carrier_off(queue->vif->dev);
skb_queue_purge(&queue->rx_queue);
queue->rx_last_skb_slots = 0;
if (net_ratelimit())
netdev_err(queue->vif->dev, "Carrier off due to lack of guest response on queue %d\n", queue->id);
} else {
/* Probably an another queue already turned the carrier
* off, make sure nothing is stucked in the internal
* queue of this queue
*/
skb_queue_purge(&queue->rx_queue);
queue->rx_last_skb_slots = 0;
}
rtnl_unlock();
} else if (!netif_carrier_ok(queue->vif->dev)) {
unsigned int num_queues = queue->vif->num_queues;
unsigned int i;
/* The carrier was down, but an interrupt kicked
* the thread again after new requests were
* posted
*/
clear_bit(QUEUE_STATUS_RX_STALLED,
&queue->status);
rtnl_lock();
netif_carrier_on(queue->vif->dev);
netif_tx_wake_all_queues(queue->vif->dev);
rtnl_unlock();
for (i = 0; i < num_queues; i++) { queue->last_rx_time = jiffies; /* Reset Rx stall detection. */
struct xenvif_queue *temp = &queue->vif->queues[i]; queue->stalled = false;
xenvif_napi_schedule_or_enable_events(temp); /* All queues are ready? Enable the carrier. */
spin_lock(&vif->lock);
if (--vif->stalled_queues == 0) {
netdev_info(vif->dev, "Guest Rx ready");
netif_carrier_on(vif->dev);
} }
if (net_ratelimit()) spin_unlock(&vif->lock);
netdev_err(queue->vif->dev, "Carrier on again\n"); }
} else {
/* Queuing were stopped, but the guest posted static bool xenvif_rx_queue_stalled(struct xenvif_queue *queue)
* new requests and sent an interrupt {
RING_IDX prod, cons;
prod = queue->rx.sring->req_prod;
cons = queue->rx.req_cons;
return !queue->stalled
&& prod - cons < XEN_NETBK_RX_SLOTS_MAX
&& time_after(jiffies,
queue->last_rx_time + rx_stall_timeout_jiffies);
}
static bool xenvif_rx_queue_ready(struct xenvif_queue *queue)
{
RING_IDX prod, cons;
prod = queue->rx.sring->req_prod;
cons = queue->rx.req_cons;
return queue->stalled
&& prod - cons >= XEN_NETBK_RX_SLOTS_MAX;
}
static bool xenvif_have_rx_work(struct xenvif_queue *queue)
{
return (!skb_queue_empty(&queue->rx_queue)
&& xenvif_rx_ring_slots_available(queue, XEN_NETBK_RX_SLOTS_MAX))
|| xenvif_rx_queue_stalled(queue)
|| xenvif_rx_queue_ready(queue)
|| kthread_should_stop()
|| queue->vif->disabled;
}
static long xenvif_rx_queue_timeout(struct xenvif_queue *queue)
{
struct sk_buff *skb;
long timeout;
skb = skb_peek(&queue->rx_queue);
if (!skb)
return MAX_SCHEDULE_TIMEOUT;
timeout = XENVIF_RX_CB(skb)->expires - jiffies;
return timeout < 0 ? 0 : timeout;
}
/* Wait until the guest Rx thread has work.
*
* The timeout needs to be adjusted based on the current head of the
* queue (and not just the head at the beginning). In particular, if
* the queue is initially empty an infinite timeout is used and this
* needs to be reduced when a skb is queued.
*
* This cannot be done with wait_event_timeout() because it only
* calculates the timeout once.
*/ */
clear_bit(QUEUE_STATUS_RX_STALLED, static void xenvif_wait_for_rx_work(struct xenvif_queue *queue)
&queue->status); {
del_timer_sync(&queue->rx_stalled); DEFINE_WAIT(wait);
xenvif_start_queue(queue);
if (xenvif_have_rx_work(queue))
return;
for (;;) {
long ret;
prepare_to_wait(&queue->wq, &wait, TASK_INTERRUPTIBLE);
if (xenvif_have_rx_work(queue))
break;
ret = schedule_timeout(xenvif_rx_queue_timeout(queue));
if (!ret)
break;
} }
finish_wait(&queue->wq, &wait);
} }
int xenvif_kthread_guest_rx(void *data) int xenvif_kthread_guest_rx(void *data)
{ {
struct xenvif_queue *queue = data; struct xenvif_queue *queue = data;
struct sk_buff *skb; struct xenvif *vif = queue->vif;
while (!kthread_should_stop()) { for (;;) {
wait_event_interruptible(queue->wq, xenvif_wait_for_rx_work(queue);
rx_work_todo(queue) ||
queue->vif->disabled ||
test_bit(QUEUE_STATUS_RX_PURGE_EVENT, &queue->status) ||
kthread_should_stop());
if (kthread_should_stop()) if (kthread_should_stop())
break; break;
...@@ -2028,35 +2107,38 @@ int xenvif_kthread_guest_rx(void *data) ...@@ -2028,35 +2107,38 @@ int xenvif_kthread_guest_rx(void *data)
* context so we defer it here, if this thread is * context so we defer it here, if this thread is
* associated with queue 0. * associated with queue 0.
*/ */
if (unlikely(queue->vif->disabled && queue->id == 0)) { if (unlikely(vif->disabled && queue->id == 0)) {
xenvif_carrier_off(queue->vif); xenvif_carrier_off(vif);
} else if (unlikely(queue->vif->disabled)) { xenvif_rx_queue_purge(queue);
/* kthread_stop() would be called upon this thread soon, continue;
* be a bit proactive
*/
skb_queue_purge(&queue->rx_queue);
queue->rx_last_skb_slots = 0;
} else if (unlikely(test_and_clear_bit(QUEUE_STATUS_RX_PURGE_EVENT,
&queue->status))) {
xenvif_rx_purge_event(queue);
} else if (!netif_carrier_ok(queue->vif->dev)) {
/* Another queue stalled and turned the carrier off, so
* purge the internal queue of queues which were not
* blocked
*/
skb_queue_purge(&queue->rx_queue);
queue->rx_last_skb_slots = 0;
} }
if (!skb_queue_empty(&queue->rx_queue)) if (!skb_queue_empty(&queue->rx_queue))
xenvif_rx_action(queue); xenvif_rx_action(queue);
/* If the guest hasn't provided any Rx slots for a
* while it's probably not responsive, drop the
* carrier so packets are dropped earlier.
*/
if (xenvif_rx_queue_stalled(queue))
xenvif_queue_carrier_off(queue);
else if (xenvif_rx_queue_ready(queue))
xenvif_queue_carrier_on(queue);
/* Queued packets may have foreign pages from other
* domains. These cannot be queued indefinitely as
* this would starve guests of grant refs and transmit
* slots.
*/
xenvif_rx_queue_drop_expired(queue);
xenvif_rx_queue_maybe_wake(queue);
cond_resched(); cond_resched();
} }
/* Bin any remaining skbs */ /* Bin any remaining skbs */
while ((skb = skb_dequeue(&queue->rx_queue)) != NULL) xenvif_rx_queue_purge(queue);
dev_kfree_skb(skb);
return 0; return 0;
} }
...@@ -2113,6 +2195,7 @@ static int __init netback_init(void) ...@@ -2113,6 +2195,7 @@ static int __init netback_init(void)
goto failed_init; goto failed_init;
rx_drain_timeout_jiffies = msecs_to_jiffies(rx_drain_timeout_msecs); rx_drain_timeout_jiffies = msecs_to_jiffies(rx_drain_timeout_msecs);
rx_stall_timeout_jiffies = msecs_to_jiffies(rx_stall_timeout_msecs);
#ifdef CONFIG_DEBUG_FS #ifdef CONFIG_DEBUG_FS
xen_netback_dbg_root = debugfs_create_dir("xen-netback", NULL); xen_netback_dbg_root = debugfs_create_dir("xen-netback", NULL);
......
...@@ -52,6 +52,7 @@ static int xenvif_read_io_ring(struct seq_file *m, void *v) ...@@ -52,6 +52,7 @@ static int xenvif_read_io_ring(struct seq_file *m, void *v)
struct xenvif_queue *queue = m->private; struct xenvif_queue *queue = m->private;
struct xen_netif_tx_back_ring *tx_ring = &queue->tx; struct xen_netif_tx_back_ring *tx_ring = &queue->tx;
struct xen_netif_rx_back_ring *rx_ring = &queue->rx; struct xen_netif_rx_back_ring *rx_ring = &queue->rx;
struct netdev_queue *dev_queue;
if (tx_ring->sring) { if (tx_ring->sring) {
struct xen_netif_tx_sring *sring = tx_ring->sring; struct xen_netif_tx_sring *sring = tx_ring->sring;
...@@ -112,6 +113,13 @@ static int xenvif_read_io_ring(struct seq_file *m, void *v) ...@@ -112,6 +113,13 @@ static int xenvif_read_io_ring(struct seq_file *m, void *v)
queue->credit_timeout.expires, queue->credit_timeout.expires,
jiffies); jiffies);
dev_queue = netdev_get_tx_queue(queue->vif->dev, queue->id);
seq_printf(m, "\nRx internal queue: len %u max %u pkts %u %s\n",
queue->rx_queue_len, queue->rx_queue_max,
skb_queue_len(&queue->rx_queue),
netif_tx_queue_stopped(dev_queue) ? "stopped" : "running");
return 0; return 0;
} }
...@@ -703,6 +711,7 @@ static void connect(struct backend_info *be) ...@@ -703,6 +711,7 @@ static void connect(struct backend_info *be)
be->vif->queues = vzalloc(requested_num_queues * be->vif->queues = vzalloc(requested_num_queues *
sizeof(struct xenvif_queue)); sizeof(struct xenvif_queue));
be->vif->num_queues = requested_num_queues; be->vif->num_queues = requested_num_queues;
be->vif->stalled_queues = requested_num_queues;
for (queue_index = 0; queue_index < requested_num_queues; ++queue_index) { for (queue_index = 0; queue_index < requested_num_queues; ++queue_index) {
queue = &be->vif->queues[queue_index]; queue = &be->vif->queues[queue_index];
...@@ -873,15 +882,10 @@ static int read_xenbus_vif_flags(struct backend_info *be) ...@@ -873,15 +882,10 @@ static int read_xenbus_vif_flags(struct backend_info *be)
if (!rx_copy) if (!rx_copy)
return -EOPNOTSUPP; return -EOPNOTSUPP;
if (vif->dev->tx_queue_len != 0) {
if (xenbus_scanf(XBT_NIL, dev->otherend, if (xenbus_scanf(XBT_NIL, dev->otherend,
"feature-rx-notify", "%d", &val) < 0) "feature-rx-notify", "%d", &val) < 0 || val == 0) {
val = 0; xenbus_dev_fatal(dev, -EINVAL, "feature-rx-notify is mandatory");
if (val) return -EINVAL;
vif->can_queue = 1;
else
/* Must be non-zero for pfifo_fast to work. */
vif->dev->tx_queue_len = 1;
} }
if (xenbus_scanf(XBT_NIL, dev->otherend, "feature-sg", if (xenbus_scanf(XBT_NIL, dev->otherend, "feature-sg",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment