Commit f646c75b authored by David S. Miller's avatar David S. Miller

Merge branch 'vhost_net-rx-batch-dequeuing'

Jason Wang says:

====================
vhost_net rx batch dequeuing

This series tries to implement rx batching for vhost-net. This is done
by batching the dequeuing from skb_array which was exported by
underlayer socket and pass the sbk back through msg_control to finish
userspace copying. This is also the requirement for more batching
implemention on rx path.

Tests shows at most 7.56% improvment bon rx pps on top of batch
zeroing and no obvious changes for TCP_STREAM/TCP_RR result.

Please review.

Thanks

Changes from V4:
- drop batch zeroing patch
- renew the performance numbers
- move skb pointer array out of vhost_net structure

Changes from V3:
- add batch zeroing patch to fix the build warnings

Changes from V2:
- rebase to net-next HEAD
- use unconsume helpers to put skb back on releasing
- introduce and use vhost_net internal buffer helpers
- renew performance numbers on top of batch zeroing

Changes from V1:
- switch to use for() in __ptr_ring_consume_batched()
- rename peek_head_len_batched() to fetch_skbs()
- use skb_array_consume_batched() instead of
  skb_array_consume_batched_bh() since no consumer run in bh
- drop the lockless peeking patch since skb_array could be resized, so
  it's not safe to call lockless one
====================
Acked-by: default avatarMichael S. Tsirkin <mst@redhat.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 1fc4d180 c67df11f
...@@ -824,15 +824,17 @@ static ssize_t tap_put_user(struct tap_queue *q, ...@@ -824,15 +824,17 @@ static ssize_t tap_put_user(struct tap_queue *q,
static ssize_t tap_do_read(struct tap_queue *q, static ssize_t tap_do_read(struct tap_queue *q,
struct iov_iter *to, struct iov_iter *to,
int noblock) int noblock, struct sk_buff *skb)
{ {
DEFINE_WAIT(wait); DEFINE_WAIT(wait);
struct sk_buff *skb;
ssize_t ret = 0; ssize_t ret = 0;
if (!iov_iter_count(to)) if (!iov_iter_count(to))
return 0; return 0;
if (skb)
goto put;
while (1) { while (1) {
if (!noblock) if (!noblock)
prepare_to_wait(sk_sleep(&q->sk), &wait, prepare_to_wait(sk_sleep(&q->sk), &wait,
...@@ -856,6 +858,7 @@ static ssize_t tap_do_read(struct tap_queue *q, ...@@ -856,6 +858,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
if (!noblock) if (!noblock)
finish_wait(sk_sleep(&q->sk), &wait); finish_wait(sk_sleep(&q->sk), &wait);
put:
if (skb) { if (skb) {
ret = tap_put_user(q, skb, to); ret = tap_put_user(q, skb, to);
if (unlikely(ret < 0)) if (unlikely(ret < 0))
...@@ -872,7 +875,7 @@ static ssize_t tap_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -872,7 +875,7 @@ static ssize_t tap_read_iter(struct kiocb *iocb, struct iov_iter *to)
struct tap_queue *q = file->private_data; struct tap_queue *q = file->private_data;
ssize_t len = iov_iter_count(to), ret; ssize_t len = iov_iter_count(to), ret;
ret = tap_do_read(q, to, file->f_flags & O_NONBLOCK); ret = tap_do_read(q, to, file->f_flags & O_NONBLOCK, NULL);
ret = min_t(ssize_t, ret, len); ret = min_t(ssize_t, ret, len);
if (ret > 0) if (ret > 0)
iocb->ki_pos = ret; iocb->ki_pos = ret;
...@@ -1155,7 +1158,8 @@ static int tap_recvmsg(struct socket *sock, struct msghdr *m, ...@@ -1155,7 +1158,8 @@ static int tap_recvmsg(struct socket *sock, struct msghdr *m,
int ret; int ret;
if (flags & ~(MSG_DONTWAIT|MSG_TRUNC)) if (flags & ~(MSG_DONTWAIT|MSG_TRUNC))
return -EINVAL; return -EINVAL;
ret = tap_do_read(q, &m->msg_iter, flags & MSG_DONTWAIT); ret = tap_do_read(q, &m->msg_iter, flags & MSG_DONTWAIT,
m->msg_control);
if (ret > total_len) { if (ret > total_len) {
m->msg_flags |= MSG_TRUNC; m->msg_flags |= MSG_TRUNC;
ret = flags & MSG_TRUNC ? ret : total_len; ret = flags & MSG_TRUNC ? ret : total_len;
...@@ -1193,6 +1197,19 @@ struct socket *tap_get_socket(struct file *file) ...@@ -1193,6 +1197,19 @@ struct socket *tap_get_socket(struct file *file)
} }
EXPORT_SYMBOL_GPL(tap_get_socket); EXPORT_SYMBOL_GPL(tap_get_socket);
struct skb_array *tap_get_skb_array(struct file *file)
{
struct tap_queue *q;
if (file->f_op != &tap_fops)
return ERR_PTR(-EINVAL);
q = file->private_data;
if (!q)
return ERR_PTR(-EBADFD);
return &q->skb_array;
}
EXPORT_SYMBOL_GPL(tap_get_skb_array);
int tap_queue_resize(struct tap_dev *tap) int tap_queue_resize(struct tap_dev *tap)
{ {
struct net_device *dev = tap->dev; struct net_device *dev = tap->dev;
......
...@@ -1510,9 +1510,8 @@ static struct sk_buff *tun_ring_recv(struct tun_file *tfile, int noblock, ...@@ -1510,9 +1510,8 @@ static struct sk_buff *tun_ring_recv(struct tun_file *tfile, int noblock,
static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile, static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
struct iov_iter *to, struct iov_iter *to,
int noblock) int noblock, struct sk_buff *skb)
{ {
struct sk_buff *skb;
ssize_t ret; ssize_t ret;
int err; int err;
...@@ -1521,10 +1520,12 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile, ...@@ -1521,10 +1520,12 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
if (!iov_iter_count(to)) if (!iov_iter_count(to))
return 0; return 0;
if (!skb) {
/* Read frames from ring */ /* Read frames from ring */
skb = tun_ring_recv(tfile, noblock, &err); skb = tun_ring_recv(tfile, noblock, &err);
if (!skb) if (!skb)
return err; return err;
}
ret = tun_put_user(tun, tfile, skb, to); ret = tun_put_user(tun, tfile, skb, to);
if (unlikely(ret < 0)) if (unlikely(ret < 0))
...@@ -1544,7 +1545,7 @@ static ssize_t tun_chr_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -1544,7 +1545,7 @@ static ssize_t tun_chr_read_iter(struct kiocb *iocb, struct iov_iter *to)
if (!tun) if (!tun)
return -EBADFD; return -EBADFD;
ret = tun_do_read(tun, tfile, to, file->f_flags & O_NONBLOCK); ret = tun_do_read(tun, tfile, to, file->f_flags & O_NONBLOCK, NULL);
ret = min_t(ssize_t, ret, len); ret = min_t(ssize_t, ret, len);
if (ret > 0) if (ret > 0)
iocb->ki_pos = ret; iocb->ki_pos = ret;
...@@ -1646,7 +1647,8 @@ static int tun_recvmsg(struct socket *sock, struct msghdr *m, size_t total_len, ...@@ -1646,7 +1647,8 @@ static int tun_recvmsg(struct socket *sock, struct msghdr *m, size_t total_len,
SOL_PACKET, TUN_TX_TIMESTAMP); SOL_PACKET, TUN_TX_TIMESTAMP);
goto out; goto out;
} }
ret = tun_do_read(tun, tfile, &m->msg_iter, flags & MSG_DONTWAIT); ret = tun_do_read(tun, tfile, &m->msg_iter, flags & MSG_DONTWAIT,
m->msg_control);
if (ret > (ssize_t)total_len) { if (ret > (ssize_t)total_len) {
m->msg_flags |= MSG_TRUNC; m->msg_flags |= MSG_TRUNC;
ret = flags & MSG_TRUNC ? ret : total_len; ret = flags & MSG_TRUNC ? ret : total_len;
...@@ -2626,6 +2628,19 @@ struct socket *tun_get_socket(struct file *file) ...@@ -2626,6 +2628,19 @@ struct socket *tun_get_socket(struct file *file)
} }
EXPORT_SYMBOL_GPL(tun_get_socket); EXPORT_SYMBOL_GPL(tun_get_socket);
struct skb_array *tun_get_skb_array(struct file *file)
{
struct tun_file *tfile;
if (file->f_op != &tun_fops)
return ERR_PTR(-EINVAL);
tfile = file->private_data;
if (!tfile)
return ERR_PTR(-EBADFD);
return &tfile->tx_array;
}
EXPORT_SYMBOL_GPL(tun_get_skb_array);
module_init(tun_init); module_init(tun_init);
module_exit(tun_cleanup); module_exit(tun_cleanup);
MODULE_DESCRIPTION(DRV_DESCRIPTION); MODULE_DESCRIPTION(DRV_DESCRIPTION);
......
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
#include <linux/if_macvlan.h> #include <linux/if_macvlan.h>
#include <linux/if_tap.h> #include <linux/if_tap.h>
#include <linux/if_vlan.h> #include <linux/if_vlan.h>
#include <linux/skb_array.h>
#include <linux/skbuff.h>
#include <net/sock.h> #include <net/sock.h>
...@@ -85,6 +87,13 @@ struct vhost_net_ubuf_ref { ...@@ -85,6 +87,13 @@ struct vhost_net_ubuf_ref {
struct vhost_virtqueue *vq; struct vhost_virtqueue *vq;
}; };
#define VHOST_RX_BATCH 64
struct vhost_net_buf {
struct sk_buff **queue;
int tail;
int head;
};
struct vhost_net_virtqueue { struct vhost_net_virtqueue {
struct vhost_virtqueue vq; struct vhost_virtqueue vq;
size_t vhost_hlen; size_t vhost_hlen;
...@@ -99,6 +108,8 @@ struct vhost_net_virtqueue { ...@@ -99,6 +108,8 @@ struct vhost_net_virtqueue {
/* Reference counting for outstanding ubufs. /* Reference counting for outstanding ubufs.
* Protected by vq mutex. Writers must also take device mutex. */ * Protected by vq mutex. Writers must also take device mutex. */
struct vhost_net_ubuf_ref *ubufs; struct vhost_net_ubuf_ref *ubufs;
struct skb_array *rx_array;
struct vhost_net_buf rxq;
}; };
struct vhost_net { struct vhost_net {
...@@ -117,6 +128,71 @@ struct vhost_net { ...@@ -117,6 +128,71 @@ struct vhost_net {
static unsigned vhost_net_zcopy_mask __read_mostly; static unsigned vhost_net_zcopy_mask __read_mostly;
static void *vhost_net_buf_get_ptr(struct vhost_net_buf *rxq)
{
if (rxq->tail != rxq->head)
return rxq->queue[rxq->head];
else
return NULL;
}
static int vhost_net_buf_get_size(struct vhost_net_buf *rxq)
{
return rxq->tail - rxq->head;
}
static int vhost_net_buf_is_empty(struct vhost_net_buf *rxq)
{
return rxq->tail == rxq->head;
}
static void *vhost_net_buf_consume(struct vhost_net_buf *rxq)
{
void *ret = vhost_net_buf_get_ptr(rxq);
++rxq->head;
return ret;
}
static int vhost_net_buf_produce(struct vhost_net_virtqueue *nvq)
{
struct vhost_net_buf *rxq = &nvq->rxq;
rxq->head = 0;
rxq->tail = skb_array_consume_batched(nvq->rx_array, rxq->queue,
VHOST_RX_BATCH);
return rxq->tail;
}
static void vhost_net_buf_unproduce(struct vhost_net_virtqueue *nvq)
{
struct vhost_net_buf *rxq = &nvq->rxq;
if (nvq->rx_array && !vhost_net_buf_is_empty(rxq)) {
skb_array_unconsume(nvq->rx_array, rxq->queue + rxq->head,
vhost_net_buf_get_size(rxq));
rxq->head = rxq->tail = 0;
}
}
static int vhost_net_buf_peek(struct vhost_net_virtqueue *nvq)
{
struct vhost_net_buf *rxq = &nvq->rxq;
if (!vhost_net_buf_is_empty(rxq))
goto out;
if (!vhost_net_buf_produce(nvq))
return 0;
out:
return __skb_array_len_with_tag(vhost_net_buf_get_ptr(rxq));
}
static void vhost_net_buf_init(struct vhost_net_buf *rxq)
{
rxq->head = rxq->tail = 0;
}
static void vhost_net_enable_zcopy(int vq) static void vhost_net_enable_zcopy(int vq)
{ {
vhost_net_zcopy_mask |= 0x1 << vq; vhost_net_zcopy_mask |= 0x1 << vq;
...@@ -201,6 +277,7 @@ static void vhost_net_vq_reset(struct vhost_net *n) ...@@ -201,6 +277,7 @@ static void vhost_net_vq_reset(struct vhost_net *n)
n->vqs[i].ubufs = NULL; n->vqs[i].ubufs = NULL;
n->vqs[i].vhost_hlen = 0; n->vqs[i].vhost_hlen = 0;
n->vqs[i].sock_hlen = 0; n->vqs[i].sock_hlen = 0;
vhost_net_buf_init(&n->vqs[i].rxq);
} }
} }
...@@ -503,15 +580,14 @@ static void handle_tx(struct vhost_net *net) ...@@ -503,15 +580,14 @@ static void handle_tx(struct vhost_net *net)
mutex_unlock(&vq->mutex); mutex_unlock(&vq->mutex);
} }
static int peek_head_len(struct sock *sk) static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
{ {
struct socket *sock = sk->sk_socket;
struct sk_buff *head; struct sk_buff *head;
int len = 0; int len = 0;
unsigned long flags; unsigned long flags;
if (sock->ops->peek_len) if (rvq->rx_array)
return sock->ops->peek_len(sock); return vhost_net_buf_peek(rvq);
spin_lock_irqsave(&sk->sk_receive_queue.lock, flags); spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
head = skb_peek(&sk->sk_receive_queue); head = skb_peek(&sk->sk_receive_queue);
...@@ -537,10 +613,11 @@ static int sk_has_rx_data(struct sock *sk) ...@@ -537,10 +613,11 @@ static int sk_has_rx_data(struct sock *sk)
static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk) static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
{ {
struct vhost_net_virtqueue *rvq = &net->vqs[VHOST_NET_VQ_RX];
struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX]; struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
struct vhost_virtqueue *vq = &nvq->vq; struct vhost_virtqueue *vq = &nvq->vq;
unsigned long uninitialized_var(endtime); unsigned long uninitialized_var(endtime);
int len = peek_head_len(sk); int len = peek_head_len(rvq, sk);
if (!len && vq->busyloop_timeout) { if (!len && vq->busyloop_timeout) {
/* Both tx vq and rx socket were polled here */ /* Both tx vq and rx socket were polled here */
...@@ -561,7 +638,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk) ...@@ -561,7 +638,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
vhost_poll_queue(&vq->poll); vhost_poll_queue(&vq->poll);
mutex_unlock(&vq->mutex); mutex_unlock(&vq->mutex);
len = peek_head_len(sk); len = peek_head_len(rvq, sk);
} }
return len; return len;
...@@ -699,6 +776,8 @@ static void handle_rx(struct vhost_net *net) ...@@ -699,6 +776,8 @@ static void handle_rx(struct vhost_net *net)
/* On error, stop handling until the next kick. */ /* On error, stop handling until the next kick. */
if (unlikely(headcount < 0)) if (unlikely(headcount < 0))
goto out; goto out;
if (nvq->rx_array)
msg.msg_control = vhost_net_buf_consume(&nvq->rxq);
/* On overrun, truncate and discard */ /* On overrun, truncate and discard */
if (unlikely(headcount > UIO_MAXIOV)) { if (unlikely(headcount > UIO_MAXIOV)) {
iov_iter_init(&msg.msg_iter, READ, vq->iov, 1, 1); iov_iter_init(&msg.msg_iter, READ, vq->iov, 1, 1);
...@@ -815,6 +894,7 @@ static int vhost_net_open(struct inode *inode, struct file *f) ...@@ -815,6 +894,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
struct vhost_net *n; struct vhost_net *n;
struct vhost_dev *dev; struct vhost_dev *dev;
struct vhost_virtqueue **vqs; struct vhost_virtqueue **vqs;
struct sk_buff **queue;
int i; int i;
n = kvmalloc(sizeof *n, GFP_KERNEL | __GFP_REPEAT); n = kvmalloc(sizeof *n, GFP_KERNEL | __GFP_REPEAT);
...@@ -826,6 +906,15 @@ static int vhost_net_open(struct inode *inode, struct file *f) ...@@ -826,6 +906,15 @@ static int vhost_net_open(struct inode *inode, struct file *f)
return -ENOMEM; return -ENOMEM;
} }
queue = kmalloc_array(VHOST_RX_BATCH, sizeof(struct sk_buff *),
GFP_KERNEL);
if (!queue) {
kfree(vqs);
kvfree(n);
return -ENOMEM;
}
n->vqs[VHOST_NET_VQ_RX].rxq.queue = queue;
dev = &n->dev; dev = &n->dev;
vqs[VHOST_NET_VQ_TX] = &n->vqs[VHOST_NET_VQ_TX].vq; vqs[VHOST_NET_VQ_TX] = &n->vqs[VHOST_NET_VQ_TX].vq;
vqs[VHOST_NET_VQ_RX] = &n->vqs[VHOST_NET_VQ_RX].vq; vqs[VHOST_NET_VQ_RX] = &n->vqs[VHOST_NET_VQ_RX].vq;
...@@ -838,6 +927,7 @@ static int vhost_net_open(struct inode *inode, struct file *f) ...@@ -838,6 +927,7 @@ static int vhost_net_open(struct inode *inode, struct file *f)
n->vqs[i].done_idx = 0; n->vqs[i].done_idx = 0;
n->vqs[i].vhost_hlen = 0; n->vqs[i].vhost_hlen = 0;
n->vqs[i].sock_hlen = 0; n->vqs[i].sock_hlen = 0;
vhost_net_buf_init(&n->vqs[i].rxq);
} }
vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX); vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX);
...@@ -853,11 +943,14 @@ static struct socket *vhost_net_stop_vq(struct vhost_net *n, ...@@ -853,11 +943,14 @@ static struct socket *vhost_net_stop_vq(struct vhost_net *n,
struct vhost_virtqueue *vq) struct vhost_virtqueue *vq)
{ {
struct socket *sock; struct socket *sock;
struct vhost_net_virtqueue *nvq =
container_of(vq, struct vhost_net_virtqueue, vq);
mutex_lock(&vq->mutex); mutex_lock(&vq->mutex);
sock = vq->private_data; sock = vq->private_data;
vhost_net_disable_vq(n, vq); vhost_net_disable_vq(n, vq);
vq->private_data = NULL; vq->private_data = NULL;
vhost_net_buf_unproduce(nvq);
mutex_unlock(&vq->mutex); mutex_unlock(&vq->mutex);
return sock; return sock;
} }
...@@ -912,6 +1005,7 @@ static int vhost_net_release(struct inode *inode, struct file *f) ...@@ -912,6 +1005,7 @@ static int vhost_net_release(struct inode *inode, struct file *f)
/* We do an extra flush before freeing memory, /* We do an extra flush before freeing memory,
* since jobs can re-queue themselves. */ * since jobs can re-queue themselves. */
vhost_net_flush(n); vhost_net_flush(n);
kfree(n->vqs[VHOST_NET_VQ_RX].rxq.queue);
kfree(n->dev.vqs); kfree(n->dev.vqs);
kvfree(n); kvfree(n);
return 0; return 0;
...@@ -950,6 +1044,25 @@ static struct socket *get_raw_socket(int fd) ...@@ -950,6 +1044,25 @@ static struct socket *get_raw_socket(int fd)
return ERR_PTR(r); return ERR_PTR(r);
} }
static struct skb_array *get_tap_skb_array(int fd)
{
struct skb_array *array;
struct file *file = fget(fd);
if (!file)
return NULL;
array = tun_get_skb_array(file);
if (!IS_ERR(array))
goto out;
array = tap_get_skb_array(file);
if (!IS_ERR(array))
goto out;
array = NULL;
out:
fput(file);
return array;
}
static struct socket *get_tap_socket(int fd) static struct socket *get_tap_socket(int fd)
{ {
struct file *file = fget(fd); struct file *file = fget(fd);
...@@ -1026,6 +1139,9 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd) ...@@ -1026,6 +1139,9 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
vhost_net_disable_vq(n, vq); vhost_net_disable_vq(n, vq);
vq->private_data = sock; vq->private_data = sock;
vhost_net_buf_unproduce(nvq);
if (index == VHOST_NET_VQ_RX)
nvq->rx_array = get_tap_skb_array(fd);
r = vhost_vq_init_access(vq); r = vhost_vq_init_access(vq);
if (r) if (r)
goto err_used; goto err_used;
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#if IS_ENABLED(CONFIG_TAP) #if IS_ENABLED(CONFIG_TAP)
struct socket *tap_get_socket(struct file *); struct socket *tap_get_socket(struct file *);
struct skb_array *tap_get_skb_array(struct file *file);
#else #else
#include <linux/err.h> #include <linux/err.h>
#include <linux/errno.h> #include <linux/errno.h>
...@@ -12,6 +13,10 @@ static inline struct socket *tap_get_socket(struct file *f) ...@@ -12,6 +13,10 @@ static inline struct socket *tap_get_socket(struct file *f)
{ {
return ERR_PTR(-EINVAL); return ERR_PTR(-EINVAL);
} }
static inline struct skb_array *tap_get_skb_array(struct file *f)
{
return ERR_PTR(-EINVAL);
}
#endif /* CONFIG_TAP */ #endif /* CONFIG_TAP */
#include <net/sock.h> #include <net/sock.h>
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#if defined(CONFIG_TUN) || defined(CONFIG_TUN_MODULE) #if defined(CONFIG_TUN) || defined(CONFIG_TUN_MODULE)
struct socket *tun_get_socket(struct file *); struct socket *tun_get_socket(struct file *);
struct skb_array *tun_get_skb_array(struct file *file);
#else #else
#include <linux/err.h> #include <linux/err.h>
#include <linux/errno.h> #include <linux/errno.h>
...@@ -28,5 +29,9 @@ static inline struct socket *tun_get_socket(struct file *f) ...@@ -28,5 +29,9 @@ static inline struct socket *tun_get_socket(struct file *f)
{ {
return ERR_PTR(-EINVAL); return ERR_PTR(-EINVAL);
} }
static inline struct skb_array *tun_get_skb_array(struct file *f)
{
return ERR_PTR(-EINVAL);
}
#endif /* CONFIG_TUN */ #endif /* CONFIG_TUN */
#endif /* __IF_TUN_H */ #endif /* __IF_TUN_H */
...@@ -278,6 +278,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r) ...@@ -278,6 +278,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r)
return ptr; return ptr;
} }
static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
void **array, int n)
{
void *ptr;
int i;
for (i = 0; i < n; i++) {
ptr = __ptr_ring_consume(r);
if (!ptr)
break;
array[i] = ptr;
}
return i;
}
/* /*
* Note: resize (below) nests producer lock within consumer lock, so if you * Note: resize (below) nests producer lock within consumer lock, so if you
* call this in interrupt or BH context, you must disable interrupts/BH when * call this in interrupt or BH context, you must disable interrupts/BH when
...@@ -328,6 +344,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r) ...@@ -328,6 +344,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
return ptr; return ptr;
} }
static inline int ptr_ring_consume_batched(struct ptr_ring *r,
void **array, int n)
{
int ret;
spin_lock(&r->consumer_lock);
ret = __ptr_ring_consume_batched(r, array, n);
spin_unlock(&r->consumer_lock);
return ret;
}
static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
void **array, int n)
{
int ret;
spin_lock_irq(&r->consumer_lock);
ret = __ptr_ring_consume_batched(r, array, n);
spin_unlock_irq(&r->consumer_lock);
return ret;
}
static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
void **array, int n)
{
unsigned long flags;
int ret;
spin_lock_irqsave(&r->consumer_lock, flags);
ret = __ptr_ring_consume_batched(r, array, n);
spin_unlock_irqrestore(&r->consumer_lock, flags);
return ret;
}
static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
void **array, int n)
{
int ret;
spin_lock_bh(&r->consumer_lock);
ret = __ptr_ring_consume_batched(r, array, n);
spin_unlock_bh(&r->consumer_lock);
return ret;
}
/* Cast to structure type and call a function without discarding from FIFO. /* Cast to structure type and call a function without discarding from FIFO.
* Function must return a value. * Function must return a value.
* Callers must take consumer_lock. * Callers must take consumer_lock.
...@@ -403,6 +468,61 @@ static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) ...@@ -403,6 +468,61 @@ static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
return 0; return 0;
} }
/*
* Return entries into ring. Destroy entries that don't fit.
*
* Note: this is expected to be a rare slow path operation.
*
* Note: producer lock is nested within consumer lock, so if you
* resize you must make sure all uses nest correctly.
* In particular if you consume ring in interrupt or BH context, you must
* disable interrupts/BH when doing so.
*/
static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
void (*destroy)(void *))
{
unsigned long flags;
int head;
spin_lock_irqsave(&r->consumer_lock, flags);
spin_lock(&r->producer_lock);
if (!r->size)
goto done;
/*
* Clean out buffered entries (for simplicity). This way following code
* can test entries for NULL and if not assume they are valid.
*/
head = r->consumer_head - 1;
while (likely(head >= r->consumer_tail))
r->queue[head--] = NULL;
r->consumer_tail = r->consumer_head;
/*
* Go over entries in batch, start moving head back and copy entries.
* Stop when we run into previously unconsumed entries.
*/
while (n) {
head = r->consumer_head - 1;
if (head < 0)
head = r->size - 1;
if (r->queue[head]) {
/* This batch entry will have to be destroyed. */
goto done;
}
r->queue[head] = batch[--n];
r->consumer_tail = r->consumer_head = head;
}
done:
/* Destroy all entries left in the batch. */
while (n)
destroy(batch[--n]);
spin_unlock(&r->producer_lock);
spin_unlock_irqrestore(&r->consumer_lock, flags);
}
static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
int size, gfp_t gfp, int size, gfp_t gfp,
void (*destroy)(void *)) void (*destroy)(void *))
......
...@@ -97,21 +97,46 @@ static inline struct sk_buff *skb_array_consume(struct skb_array *a) ...@@ -97,21 +97,46 @@ static inline struct sk_buff *skb_array_consume(struct skb_array *a)
return ptr_ring_consume(&a->ring); return ptr_ring_consume(&a->ring);
} }
static inline int skb_array_consume_batched(struct skb_array *a,
struct sk_buff **array, int n)
{
return ptr_ring_consume_batched(&a->ring, (void **)array, n);
}
static inline struct sk_buff *skb_array_consume_irq(struct skb_array *a) static inline struct sk_buff *skb_array_consume_irq(struct skb_array *a)
{ {
return ptr_ring_consume_irq(&a->ring); return ptr_ring_consume_irq(&a->ring);
} }
static inline int skb_array_consume_batched_irq(struct skb_array *a,
struct sk_buff **array, int n)
{
return ptr_ring_consume_batched_irq(&a->ring, (void **)array, n);
}
static inline struct sk_buff *skb_array_consume_any(struct skb_array *a) static inline struct sk_buff *skb_array_consume_any(struct skb_array *a)
{ {
return ptr_ring_consume_any(&a->ring); return ptr_ring_consume_any(&a->ring);
} }
static inline int skb_array_consume_batched_any(struct skb_array *a,
struct sk_buff **array, int n)
{
return ptr_ring_consume_batched_any(&a->ring, (void **)array, n);
}
static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a) static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a)
{ {
return ptr_ring_consume_bh(&a->ring); return ptr_ring_consume_bh(&a->ring);
} }
static inline int skb_array_consume_batched_bh(struct skb_array *a,
struct sk_buff **array, int n)
{
return ptr_ring_consume_batched_bh(&a->ring, (void **)array, n);
}
static inline int __skb_array_len_with_tag(struct sk_buff *skb) static inline int __skb_array_len_with_tag(struct sk_buff *skb)
{ {
if (likely(skb)) { if (likely(skb)) {
...@@ -156,6 +181,12 @@ static void __skb_array_destroy_skb(void *ptr) ...@@ -156,6 +181,12 @@ static void __skb_array_destroy_skb(void *ptr)
kfree_skb(ptr); kfree_skb(ptr);
} }
static inline void skb_array_unconsume(struct skb_array *a,
struct sk_buff **skbs, int n)
{
ptr_ring_unconsume(&a->ring, (void **)skbs, n, __skb_array_destroy_skb);
}
static inline int skb_array_resize(struct skb_array *a, int size, gfp_t gfp) static inline int skb_array_resize(struct skb_array *a, int size, gfp_t gfp)
{ {
return ptr_ring_resize(&a->ring, size, gfp, __skb_array_destroy_skb); return ptr_ring_resize(&a->ring, size, gfp, __skb_array_destroy_skb);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment