Commit 44b9b6ca authored by David S. Miller's avatar David S. Miller

Merge branch 'net-sched-move-back-qlen-to-per-CPU-accounting'

Paolo Abeni says:

====================
net: sched: move back qlen to per CPU accounting

The commit 46b1c18f ("net: sched: put back q.qlen into a single location")
introduced some measurable regression in the contended scenarios for
lock qdisc.

As Eric suggested we could replace q.qlen access with calls to qdisc_is_empty()
in the datapath and revert the above commit. The TC subsystem updates
qdisc->is_empty in a somewhat loose way: notably 'is_empty' is set only when
the qdisc dequeue() calls return a NULL ptr. That is, the invocation after
the last packet is dequeued.

The above is good enough for BYPASS implementation - the only downside is that
we end up avoiding the optimization for a very small time-frame - but will
break hard things when internal structures consistency for classful qdisc
relies on child qdisc_is_empty().

A more strict 'is_empty' update adds a relevant complexity to its life-cycle, so
this series takes a different approach: we allow lockless qdisc to switch from
per CPU accounting to global stats accounting when the NOLOCK bit is cleared.
Since most pieces of infrastructure are already in place, this requires very
little changes to the pfifo_fast qdisc, and any later NOLOCK qdisc can hook
there with little effort - no need to maintain two different implementations.

The first 2 patches removes direct qlen access from non core TC code, the 3rd
and 4th patches place and use the infrastructure to allow stats account
switching and the 5th patch is the actual revert.

 v1 -> v2:
  - fixed build issues
  - more descriptive commit message for patch 5/5
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 4c75be07 73eb628d
......@@ -52,10 +52,7 @@ struct qdisc_size_table {
struct qdisc_skb_head {
struct sk_buff *head;
struct sk_buff *tail;
union {
u32 qlen;
atomic_t atomic_qlen;
};
__u32 qlen;
spinlock_t lock;
};
......@@ -146,9 +143,14 @@ static inline bool qdisc_is_running(struct Qdisc *qdisc)
return (raw_read_seqcount(&qdisc->running) & 1) ? true : false;
}
static inline bool qdisc_is_percpu_stats(const struct Qdisc *q)
{
return q->flags & TCQ_F_CPUSTATS;
}
static inline bool qdisc_is_empty(const struct Qdisc *qdisc)
{
if (qdisc->flags & TCQ_F_NOLOCK)
if (qdisc_is_percpu_stats(qdisc))
return qdisc->empty;
return !qdisc->q.qlen;
}
......@@ -481,19 +483,27 @@ static inline void qdisc_cb_private_validate(const struct sk_buff *skb, int sz)
BUILD_BUG_ON(sizeof(qcb->data) < sz);
}
static inline int qdisc_qlen_cpu(const struct Qdisc *q)
{
return this_cpu_ptr(q->cpu_qstats)->qlen;
}
static inline int qdisc_qlen(const struct Qdisc *q)
{
return q->q.qlen;
}
static inline u32 qdisc_qlen_sum(const struct Qdisc *q)
static inline int qdisc_qlen_sum(const struct Qdisc *q)
{
u32 qlen = q->qstats.qlen;
__u32 qlen = q->qstats.qlen;
int i;
if (q->flags & TCQ_F_NOLOCK)
qlen += atomic_read(&q->q.atomic_qlen);
else
if (qdisc_is_percpu_stats(q)) {
for_each_possible_cpu(i)
qlen += per_cpu_ptr(q->cpu_qstats, i)->qlen;
} else {
qlen += q->q.qlen;
}
return qlen;
}
......@@ -747,7 +757,7 @@ static inline bool qdisc_all_tx_empty(const struct net_device *dev)
struct netdev_queue *txq = netdev_get_tx_queue(dev, i);
const struct Qdisc *q = rcu_dereference(txq->qdisc);
if (q->q.qlen) {
if (!qdisc_is_empty(q)) {
rcu_read_unlock();
return false;
}
......@@ -817,11 +827,6 @@ static inline int qdisc_enqueue(struct sk_buff *skb, struct Qdisc *sch,
return sch->enqueue(skb, sch, to_free);
}
static inline bool qdisc_is_percpu_stats(const struct Qdisc *q)
{
return q->flags & TCQ_F_CPUSTATS;
}
static inline void _bstats_update(struct gnet_stats_basic_packed *bstats,
__u64 bytes, __u32 packets)
{
......@@ -889,14 +894,14 @@ static inline void qdisc_qstats_cpu_backlog_inc(struct Qdisc *sch,
this_cpu_add(sch->cpu_qstats->backlog, qdisc_pkt_len(skb));
}
static inline void qdisc_qstats_atomic_qlen_inc(struct Qdisc *sch)
static inline void qdisc_qstats_cpu_qlen_inc(struct Qdisc *sch)
{
atomic_inc(&sch->q.atomic_qlen);
this_cpu_inc(sch->cpu_qstats->qlen);
}
static inline void qdisc_qstats_atomic_qlen_dec(struct Qdisc *sch)
static inline void qdisc_qstats_cpu_qlen_dec(struct Qdisc *sch)
{
atomic_dec(&sch->q.atomic_qlen);
this_cpu_dec(sch->cpu_qstats->qlen);
}
static inline void qdisc_qstats_cpu_requeues_inc(struct Qdisc *sch)
......@@ -1106,6 +1111,32 @@ static inline struct sk_buff *qdisc_peek_dequeued(struct Qdisc *sch)
return skb;
}
static inline void qdisc_update_stats_at_dequeue(struct Qdisc *sch,
struct sk_buff *skb)
{
if (qdisc_is_percpu_stats(sch)) {
qdisc_qstats_cpu_backlog_dec(sch, skb);
qdisc_bstats_cpu_update(sch, skb);
qdisc_qstats_cpu_qlen_dec(sch);
} else {
qdisc_qstats_backlog_dec(sch, skb);
qdisc_bstats_update(sch, skb);
sch->q.qlen--;
}
}
static inline void qdisc_update_stats_at_enqueue(struct Qdisc *sch,
unsigned int pkt_len)
{
if (qdisc_is_percpu_stats(sch)) {
qdisc_qstats_cpu_qlen_inc(sch);
this_cpu_add(sch->cpu_qstats->backlog, pkt_len);
} else {
sch->qstats.backlog += pkt_len;
sch->q.qlen++;
}
}
/* use instead of qdisc->dequeue() for all qdiscs queried with ->peek() */
static inline struct sk_buff *qdisc_dequeue_peeked(struct Qdisc *sch)
{
......@@ -1113,8 +1144,13 @@ static inline struct sk_buff *qdisc_dequeue_peeked(struct Qdisc *sch)
if (skb) {
skb = __skb_dequeue(&sch->gso_skb);
if (qdisc_is_percpu_stats(sch)) {
qdisc_qstats_cpu_backlog_dec(sch, skb);
qdisc_qstats_cpu_qlen_dec(sch);
} else {
qdisc_qstats_backlog_dec(sch, skb);
sch->q.qlen--;
}
} else {
skb = sch->dequeue(sch);
}
......
......@@ -186,15 +186,19 @@ static int transmit(struct cflayer *layer, struct cfpkt *pkt)
goto noxoff;
if (likely(!netif_queue_stopped(caifd->netdev))) {
struct Qdisc *sch;
/* If we run with a TX queue, check if the queue is too long*/
txq = netdev_get_tx_queue(skb->dev, 0);
qlen = qdisc_qlen(rcu_dereference_bh(txq->qdisc));
if (likely(qlen == 0))
sch = rcu_dereference_bh(txq->qdisc);
if (likely(qdisc_is_empty(sch)))
goto noxoff;
/* can check for explicit qdisc len value only !NOLOCK,
* always set flow off otherwise
*/
high = (caifd->netdev->tx_queue_len * q_high) / 100;
if (likely(qlen < high))
if (!(sch->flags & TCQ_F_NOLOCK) && likely(sch->q.qlen < high))
goto noxoff;
}
......
......@@ -291,6 +291,7 @@ __gnet_stats_copy_queue_cpu(struct gnet_stats_queue *qstats,
for_each_possible_cpu(i) {
const struct gnet_stats_queue *qcpu = per_cpu_ptr(q, i);
qstats->qlen = 0;
qstats->backlog += qcpu->backlog;
qstats->drops += qcpu->drops;
qstats->requeues += qcpu->requeues;
......@@ -306,6 +307,7 @@ void __gnet_stats_copy_queue(struct gnet_stats_queue *qstats,
if (cpu) {
__gnet_stats_copy_queue_cpu(qstats, cpu);
} else {
qstats->qlen = q->qlen;
qstats->backlog = q->backlog;
qstats->drops = q->drops;
qstats->requeues = q->requeues;
......
......@@ -998,6 +998,19 @@ static void notify_and_destroy(struct net *net, struct sk_buff *skb,
qdisc_put(old);
}
static void qdisc_clear_nolock(struct Qdisc *sch)
{
sch->flags &= ~TCQ_F_NOLOCK;
if (!(sch->flags & TCQ_F_CPUSTATS))
return;
free_percpu(sch->cpu_bstats);
free_percpu(sch->cpu_qstats);
sch->cpu_bstats = NULL;
sch->cpu_qstats = NULL;
sch->flags &= ~TCQ_F_CPUSTATS;
}
/* Graft qdisc "new" to class "classid" of qdisc "parent" or
* to device "dev".
*
......@@ -1076,7 +1089,7 @@ static int qdisc_graft(struct net_device *dev, struct Qdisc *parent,
/* Only support running class lockless if parent is lockless */
if (new && (new->flags & TCQ_F_NOLOCK) &&
parent && !(parent->flags & TCQ_F_NOLOCK))
new->flags &= ~TCQ_F_NOLOCK;
qdisc_clear_nolock(new);
if (!cops || !cops->graft)
return -EOPNOTSUPP;
......
......@@ -68,7 +68,7 @@ static inline struct sk_buff *__skb_dequeue_bad_txq(struct Qdisc *q)
skb = __skb_dequeue(&q->skb_bad_txq);
if (qdisc_is_percpu_stats(q)) {
qdisc_qstats_cpu_backlog_dec(q, skb);
qdisc_qstats_atomic_qlen_dec(q);
qdisc_qstats_cpu_qlen_dec(q);
} else {
qdisc_qstats_backlog_dec(q, skb);
q->q.qlen--;
......@@ -108,7 +108,7 @@ static inline void qdisc_enqueue_skb_bad_txq(struct Qdisc *q,
if (qdisc_is_percpu_stats(q)) {
qdisc_qstats_cpu_backlog_inc(q, skb);
qdisc_qstats_atomic_qlen_inc(q);
qdisc_qstats_cpu_qlen_inc(q);
} else {
qdisc_qstats_backlog_inc(q, skb);
q->q.qlen++;
......@@ -118,52 +118,36 @@ static inline void qdisc_enqueue_skb_bad_txq(struct Qdisc *q,
spin_unlock(lock);
}
static inline int __dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
static inline void dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
{
while (skb) {
struct sk_buff *next = skb->next;
__skb_queue_tail(&q->gso_skb, skb);
q->qstats.requeues++;
qdisc_qstats_backlog_inc(q, skb);
q->q.qlen++; /* it's still part of the queue */
spinlock_t *lock = NULL;
skb = next;
if (q->flags & TCQ_F_NOLOCK) {
lock = qdisc_lock(q);
spin_lock(lock);
}
__netif_schedule(q);
return 0;
}
static inline int dev_requeue_skb_locked(struct sk_buff *skb, struct Qdisc *q)
{
spinlock_t *lock = qdisc_lock(q);
spin_lock(lock);
while (skb) {
struct sk_buff *next = skb->next;
__skb_queue_tail(&q->gso_skb, skb);
/* it's still part of the queue */
if (qdisc_is_percpu_stats(q)) {
qdisc_qstats_cpu_requeues_inc(q);
qdisc_qstats_cpu_backlog_inc(q, skb);
qdisc_qstats_atomic_qlen_inc(q);
qdisc_qstats_cpu_qlen_inc(q);
} else {
q->qstats.requeues++;
qdisc_qstats_backlog_inc(q, skb);
q->q.qlen++;
}
skb = next;
}
if (lock)
spin_unlock(lock);
__netif_schedule(q);
return 0;
}
static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
{
if (q->flags & TCQ_F_NOLOCK)
return dev_requeue_skb_locked(skb, q);
else
return __dev_requeue_skb(skb, q);
}
static void try_bulk_dequeue_skb(struct Qdisc *q,
......@@ -252,7 +236,7 @@ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
skb = __skb_dequeue(&q->gso_skb);
if (qdisc_is_percpu_stats(q)) {
qdisc_qstats_cpu_backlog_dec(q, skb);
qdisc_qstats_atomic_qlen_dec(q);
qdisc_qstats_cpu_qlen_dec(q);
} else {
qdisc_qstats_backlog_dec(q, skb);
q->q.qlen--;
......@@ -645,11 +629,7 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc,
if (unlikely(err))
return qdisc_drop_cpu(skb, qdisc, to_free);
qdisc_qstats_atomic_qlen_inc(qdisc);
/* Note: skb can not be used after skb_array_produce(),
* so we better not use qdisc_qstats_cpu_backlog_inc()
*/
this_cpu_add(qdisc->cpu_qstats->backlog, pkt_len);
qdisc_update_stats_at_enqueue(qdisc, pkt_len);
return NET_XMIT_SUCCESS;
}
......@@ -668,9 +648,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc)
skb = __skb_array_consume(q);
}
if (likely(skb)) {
qdisc_qstats_cpu_backlog_dec(qdisc, skb);
qdisc_bstats_cpu_update(qdisc, skb);
qdisc_qstats_atomic_qlen_dec(qdisc);
qdisc_update_stats_at_dequeue(qdisc, skb);
} else {
qdisc->empty = true;
}
......@@ -716,6 +694,7 @@ static void pfifo_fast_reset(struct Qdisc *qdisc)
struct gnet_stats_queue *q = per_cpu_ptr(qdisc->cpu_qstats, i);
q->backlog = 0;
q->qlen = 0;
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment