Commit 43815482 authored by Eric Dumazet's avatar Eric Dumazet Committed by David S. Miller

net: sock_def_readable() and friends RCU conversion

sk_callback_lock rwlock actually protects sk->sk_sleep pointer, so we
need two atomic operations (and associated dirtying) per incoming
packet.

RCU conversion is pretty much needed :

1) Add a new structure, called "struct socket_wq" to hold all fields
that will need rcu_read_lock() protection (currently: a
wait_queue_head_t and a struct fasync_struct pointer).

[Future patch will add a list anchor for wakeup coalescing]

2) Attach one of such structure to each "struct socket" created in
sock_alloc_inode().

3) Respect RCU grace period when freeing a "struct socket_wq"

4) Change sk_sleep pointer in "struct sock" by sk_wq, pointer to "struct
socket_wq"

5) Change sk_sleep() function to use new sk->sk_wq instead of
sk->sk_sleep

6) Change sk_has_sleeper() to wq_has_sleeper() that must be used inside
a rcu_read_lock() section.

7) Change all sk_has_sleeper() callers to :
  - Use rcu_read_lock() instead of read_lock(&sk->sk_callback_lock)
  - Use wq_has_sleeper() to eventually wakeup tasks.
  - Use rcu_read_unlock() instead of read_unlock(&sk->sk_callback_lock)

8) sock_wake_async() is modified to use rcu protection as well.

9) Exceptions :
  macvtap, drivers/net/tun.c, af_unix use integrated "struct socket_wq"
instead of dynamically allocated ones. They dont need rcu freeing.

Some cleanups or followups are probably needed, (possible
sk_callback_lock conversion to a spinlock for example...).
Signed-off-by: default avatarEric Dumazet <eric.dumazet@gmail.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 83d7eb29
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
struct macvtap_queue { struct macvtap_queue {
struct sock sk; struct sock sk;
struct socket sock; struct socket sock;
struct socket_wq wq;
struct macvlan_dev *vlan; struct macvlan_dev *vlan;
struct file *file; struct file *file;
unsigned int flags; unsigned int flags;
...@@ -242,12 +243,15 @@ static struct rtnl_link_ops macvtap_link_ops __read_mostly = { ...@@ -242,12 +243,15 @@ static struct rtnl_link_ops macvtap_link_ops __read_mostly = {
static void macvtap_sock_write_space(struct sock *sk) static void macvtap_sock_write_space(struct sock *sk)
{ {
wait_queue_head_t *wqueue;
if (!sock_writeable(sk) || if (!sock_writeable(sk) ||
!test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags)) !test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags))
return; return;
if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) wqueue = sk_sleep(sk);
wake_up_interruptible_poll(sk_sleep(sk), POLLOUT | POLLWRNORM | POLLWRBAND); if (wqueue && waitqueue_active(wqueue))
wake_up_interruptible_poll(wqueue, POLLOUT | POLLWRNORM | POLLWRBAND);
} }
static int macvtap_open(struct inode *inode, struct file *file) static int macvtap_open(struct inode *inode, struct file *file)
...@@ -272,7 +276,8 @@ static int macvtap_open(struct inode *inode, struct file *file) ...@@ -272,7 +276,8 @@ static int macvtap_open(struct inode *inode, struct file *file)
if (!q) if (!q)
goto out; goto out;
init_waitqueue_head(&q->sock.wait); q->sock.wq = &q->wq;
init_waitqueue_head(&q->wq.wait);
q->sock.type = SOCK_RAW; q->sock.type = SOCK_RAW;
q->sock.state = SS_CONNECTED; q->sock.state = SS_CONNECTED;
q->sock.file = file; q->sock.file = file;
...@@ -308,7 +313,7 @@ static unsigned int macvtap_poll(struct file *file, poll_table * wait) ...@@ -308,7 +313,7 @@ static unsigned int macvtap_poll(struct file *file, poll_table * wait)
goto out; goto out;
mask = 0; mask = 0;
poll_wait(file, &q->sock.wait, wait); poll_wait(file, &q->wq.wait, wait);
if (!skb_queue_empty(&q->sk.sk_receive_queue)) if (!skb_queue_empty(&q->sk.sk_receive_queue))
mask |= POLLIN | POLLRDNORM; mask |= POLLIN | POLLRDNORM;
......
...@@ -109,7 +109,7 @@ struct tun_struct { ...@@ -109,7 +109,7 @@ struct tun_struct {
struct tap_filter txflt; struct tap_filter txflt;
struct socket socket; struct socket socket;
struct socket_wq wq;
#ifdef TUN_DEBUG #ifdef TUN_DEBUG
int debug; int debug;
#endif #endif
...@@ -323,7 +323,7 @@ static void tun_net_uninit(struct net_device *dev) ...@@ -323,7 +323,7 @@ static void tun_net_uninit(struct net_device *dev)
/* Inform the methods they need to stop using the dev. /* Inform the methods they need to stop using the dev.
*/ */
if (tfile) { if (tfile) {
wake_up_all(&tun->socket.wait); wake_up_all(&tun->wq.wait);
if (atomic_dec_and_test(&tfile->count)) if (atomic_dec_and_test(&tfile->count))
__tun_detach(tun); __tun_detach(tun);
} }
...@@ -398,7 +398,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev) ...@@ -398,7 +398,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
/* Notify and wake up reader process */ /* Notify and wake up reader process */
if (tun->flags & TUN_FASYNC) if (tun->flags & TUN_FASYNC)
kill_fasync(&tun->fasync, SIGIO, POLL_IN); kill_fasync(&tun->fasync, SIGIO, POLL_IN);
wake_up_interruptible_poll(&tun->socket.wait, POLLIN | wake_up_interruptible_poll(&tun->wq.wait, POLLIN |
POLLRDNORM | POLLRDBAND); POLLRDNORM | POLLRDBAND);
return NETDEV_TX_OK; return NETDEV_TX_OK;
...@@ -498,7 +498,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table * wait) ...@@ -498,7 +498,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table * wait)
DBG(KERN_INFO "%s: tun_chr_poll\n", tun->dev->name); DBG(KERN_INFO "%s: tun_chr_poll\n", tun->dev->name);
poll_wait(file, &tun->socket.wait, wait); poll_wait(file, &tun->wq.wait, wait);
if (!skb_queue_empty(&sk->sk_receive_queue)) if (!skb_queue_empty(&sk->sk_receive_queue))
mask |= POLLIN | POLLRDNORM; mask |= POLLIN | POLLRDNORM;
...@@ -773,7 +773,7 @@ static ssize_t tun_do_read(struct tun_struct *tun, ...@@ -773,7 +773,7 @@ static ssize_t tun_do_read(struct tun_struct *tun,
DBG(KERN_INFO "%s: tun_chr_read\n", tun->dev->name); DBG(KERN_INFO "%s: tun_chr_read\n", tun->dev->name);
add_wait_queue(&tun->socket.wait, &wait); add_wait_queue(&tun->wq.wait, &wait);
while (len) { while (len) {
current->state = TASK_INTERRUPTIBLE; current->state = TASK_INTERRUPTIBLE;
...@@ -804,7 +804,7 @@ static ssize_t tun_do_read(struct tun_struct *tun, ...@@ -804,7 +804,7 @@ static ssize_t tun_do_read(struct tun_struct *tun,
} }
current->state = TASK_RUNNING; current->state = TASK_RUNNING;
remove_wait_queue(&tun->socket.wait, &wait); remove_wait_queue(&tun->wq.wait, &wait);
return ret; return ret;
} }
...@@ -861,6 +861,7 @@ static struct rtnl_link_ops tun_link_ops __read_mostly = { ...@@ -861,6 +861,7 @@ static struct rtnl_link_ops tun_link_ops __read_mostly = {
static void tun_sock_write_space(struct sock *sk) static void tun_sock_write_space(struct sock *sk)
{ {
struct tun_struct *tun; struct tun_struct *tun;
wait_queue_head_t *wqueue;
if (!sock_writeable(sk)) if (!sock_writeable(sk))
return; return;
...@@ -868,8 +869,9 @@ static void tun_sock_write_space(struct sock *sk) ...@@ -868,8 +869,9 @@ static void tun_sock_write_space(struct sock *sk)
if (!test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags)) if (!test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags))
return; return;
if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) wqueue = sk_sleep(sk);
wake_up_interruptible_sync_poll(sk_sleep(sk), POLLOUT | if (wqueue && waitqueue_active(wqueue))
wake_up_interruptible_sync_poll(wqueue, POLLOUT |
POLLWRNORM | POLLWRBAND); POLLWRNORM | POLLWRBAND);
tun = tun_sk(sk)->tun; tun = tun_sk(sk)->tun;
...@@ -1039,7 +1041,8 @@ static int tun_set_iff(struct net *net, struct file *file, struct ifreq *ifr) ...@@ -1039,7 +1041,8 @@ static int tun_set_iff(struct net *net, struct file *file, struct ifreq *ifr)
if (!sk) if (!sk)
goto err_free_dev; goto err_free_dev;
init_waitqueue_head(&tun->socket.wait); tun->socket.wq = &tun->wq;
init_waitqueue_head(&tun->wq.wait);
tun->socket.ops = &tun_socket_ops; tun->socket.ops = &tun_socket_ops;
sock_init_data(&tun->socket, sk); sock_init_data(&tun->socket, sk);
sk->sk_write_space = tun_sock_write_space; sk->sk_write_space = tun_sock_write_space;
......
...@@ -59,6 +59,7 @@ typedef enum { ...@@ -59,6 +59,7 @@ typedef enum {
#include <linux/wait.h> #include <linux/wait.h>
#include <linux/fcntl.h> /* For O_CLOEXEC and O_NONBLOCK */ #include <linux/fcntl.h> /* For O_CLOEXEC and O_NONBLOCK */
#include <linux/kmemcheck.h> #include <linux/kmemcheck.h>
#include <linux/rcupdate.h>
struct poll_table_struct; struct poll_table_struct;
struct pipe_inode_info; struct pipe_inode_info;
...@@ -116,6 +117,12 @@ enum sock_shutdown_cmd { ...@@ -116,6 +117,12 @@ enum sock_shutdown_cmd {
SHUT_RDWR = 2, SHUT_RDWR = 2,
}; };
struct socket_wq {
wait_queue_head_t wait;
struct fasync_struct *fasync_list;
struct rcu_head rcu;
} ____cacheline_aligned_in_smp;
/** /**
* struct socket - general BSD socket * struct socket - general BSD socket
* @state: socket state (%SS_CONNECTED, etc) * @state: socket state (%SS_CONNECTED, etc)
...@@ -135,11 +142,8 @@ struct socket { ...@@ -135,11 +142,8 @@ struct socket {
kmemcheck_bitfield_end(type); kmemcheck_bitfield_end(type);
unsigned long flags; unsigned long flags;
/*
* Please keep fasync_list & wait fields in the same cache line struct socket_wq *wq;
*/
struct fasync_struct *fasync_list;
wait_queue_head_t wait;
struct file *file; struct file *file;
struct sock *sk; struct sock *sk;
......
...@@ -30,7 +30,7 @@ struct unix_skb_parms { ...@@ -30,7 +30,7 @@ struct unix_skb_parms {
#endif #endif
}; };
#define UNIXCB(skb) (*(struct unix_skb_parms*)&((skb)->cb)) #define UNIXCB(skb) (*(struct unix_skb_parms *)&((skb)->cb))
#define UNIXCREDS(skb) (&UNIXCB((skb)).creds) #define UNIXCREDS(skb) (&UNIXCB((skb)).creds)
#define UNIXSID(skb) (&UNIXCB((skb)).secid) #define UNIXSID(skb) (&UNIXCB((skb)).secid)
...@@ -56,10 +56,12 @@ struct unix_sock { ...@@ -56,10 +56,12 @@ struct unix_sock {
spinlock_t lock; spinlock_t lock;
unsigned int gc_candidate : 1; unsigned int gc_candidate : 1;
unsigned int gc_maybe_cycle : 1; unsigned int gc_maybe_cycle : 1;
wait_queue_head_t peer_wait; struct socket_wq peer_wq;
}; };
#define unix_sk(__sk) ((struct unix_sock *)__sk) #define unix_sk(__sk) ((struct unix_sock *)__sk)
#define peer_wait peer_wq.wait
#ifdef CONFIG_SYSCTL #ifdef CONFIG_SYSCTL
extern int unix_sysctl_register(struct net *net); extern int unix_sysctl_register(struct net *net);
extern void unix_sysctl_unregister(struct net *net); extern void unix_sysctl_unregister(struct net *net);
......
...@@ -159,7 +159,7 @@ struct sock_common { ...@@ -159,7 +159,7 @@ struct sock_common {
* @sk_userlocks: %SO_SNDBUF and %SO_RCVBUF settings * @sk_userlocks: %SO_SNDBUF and %SO_RCVBUF settings
* @sk_lock: synchronizer * @sk_lock: synchronizer
* @sk_rcvbuf: size of receive buffer in bytes * @sk_rcvbuf: size of receive buffer in bytes
* @sk_sleep: sock wait queue * @sk_wq: sock wait queue and async head
* @sk_dst_cache: destination cache * @sk_dst_cache: destination cache
* @sk_dst_lock: destination cache lock * @sk_dst_lock: destination cache lock
* @sk_policy: flow policy * @sk_policy: flow policy
...@@ -257,7 +257,7 @@ struct sock { ...@@ -257,7 +257,7 @@ struct sock {
struct sk_buff *tail; struct sk_buff *tail;
int len; int len;
} sk_backlog; } sk_backlog;
wait_queue_head_t *sk_sleep; struct socket_wq *sk_wq;
struct dst_entry *sk_dst_cache; struct dst_entry *sk_dst_cache;
#ifdef CONFIG_XFRM #ifdef CONFIG_XFRM
struct xfrm_policy *sk_policy[2]; struct xfrm_policy *sk_policy[2];
...@@ -1219,7 +1219,7 @@ static inline void sk_set_socket(struct sock *sk, struct socket *sock) ...@@ -1219,7 +1219,7 @@ static inline void sk_set_socket(struct sock *sk, struct socket *sock)
static inline wait_queue_head_t *sk_sleep(struct sock *sk) static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{ {
return sk->sk_sleep; return &sk->sk_wq->wait;
} }
/* Detach socket from process context. /* Detach socket from process context.
* Announce socket dead, detach it from wait queue and inode. * Announce socket dead, detach it from wait queue and inode.
...@@ -1233,14 +1233,14 @@ static inline void sock_orphan(struct sock *sk) ...@@ -1233,14 +1233,14 @@ static inline void sock_orphan(struct sock *sk)
write_lock_bh(&sk->sk_callback_lock); write_lock_bh(&sk->sk_callback_lock);
sock_set_flag(sk, SOCK_DEAD); sock_set_flag(sk, SOCK_DEAD);
sk_set_socket(sk, NULL); sk_set_socket(sk, NULL);
sk->sk_sleep = NULL; sk->sk_wq = NULL;
write_unlock_bh(&sk->sk_callback_lock); write_unlock_bh(&sk->sk_callback_lock);
} }
static inline void sock_graft(struct sock *sk, struct socket *parent) static inline void sock_graft(struct sock *sk, struct socket *parent)
{ {
write_lock_bh(&sk->sk_callback_lock); write_lock_bh(&sk->sk_callback_lock);
sk->sk_sleep = &parent->wait; rcu_assign_pointer(sk->sk_wq, parent->wq);
parent->sk = sk; parent->sk = sk;
sk_set_socket(sk, parent); sk_set_socket(sk, parent);
security_sock_graft(sk, parent); security_sock_graft(sk, parent);
...@@ -1392,12 +1392,12 @@ static inline int sk_has_allocations(const struct sock *sk) ...@@ -1392,12 +1392,12 @@ static inline int sk_has_allocations(const struct sock *sk)
} }
/** /**
* sk_has_sleeper - check if there are any waiting processes * wq_has_sleeper - check if there are any waiting processes
* @sk: socket * @sk: struct socket_wq
* *
* Returns true if socket has waiting processes * Returns true if socket_wq has waiting processes
* *
* The purpose of the sk_has_sleeper and sock_poll_wait is to wrap the memory * The purpose of the wq_has_sleeper and sock_poll_wait is to wrap the memory
* barrier call. They were added due to the race found within the tcp code. * barrier call. They were added due to the race found within the tcp code.
* *
* Consider following tcp code paths: * Consider following tcp code paths:
...@@ -1410,9 +1410,10 @@ static inline int sk_has_allocations(const struct sock *sk) ...@@ -1410,9 +1410,10 @@ static inline int sk_has_allocations(const struct sock *sk)
* ... ... * ... ...
* tp->rcv_nxt check sock_def_readable * tp->rcv_nxt check sock_def_readable
* ... { * ... {
* schedule ... * schedule rcu_read_lock();
* if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) * wq = rcu_dereference(sk->sk_wq);
* wake_up_interruptible(sk_sleep(sk)) * if (wq && waitqueue_active(&wq->wait))
* wake_up_interruptible(&wq->wait)
* ... * ...
* } * }
* *
...@@ -1421,19 +1422,18 @@ static inline int sk_has_allocations(const struct sock *sk) ...@@ -1421,19 +1422,18 @@ static inline int sk_has_allocations(const struct sock *sk)
* could then endup calling schedule and sleep forever if there are no more * could then endup calling schedule and sleep forever if there are no more
* data on the socket. * data on the socket.
* *
* The sk_has_sleeper is always called right after a call to read_lock, so we
* can use smp_mb__after_lock barrier.
*/ */
static inline int sk_has_sleeper(struct sock *sk) static inline bool wq_has_sleeper(struct socket_wq *wq)
{ {
/* /*
* We need to be sure we are in sync with the * We need to be sure we are in sync with the
* add_wait_queue modifications to the wait queue. * add_wait_queue modifications to the wait queue.
* *
* This memory barrier is paired in the sock_poll_wait. * This memory barrier is paired in the sock_poll_wait.
*/ */
smp_mb__after_lock(); smp_mb();
return sk_sleep(sk) && waitqueue_active(sk_sleep(sk)); return wq && waitqueue_active(&wq->wait);
} }
/** /**
...@@ -1442,7 +1442,7 @@ static inline int sk_has_sleeper(struct sock *sk) ...@@ -1442,7 +1442,7 @@ static inline int sk_has_sleeper(struct sock *sk)
* @wait_address: socket wait queue * @wait_address: socket wait queue
* @p: poll_table * @p: poll_table
* *
* See the comments in the sk_has_sleeper function. * See the comments in the wq_has_sleeper function.
*/ */
static inline void sock_poll_wait(struct file *filp, static inline void sock_poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p) wait_queue_head_t *wait_address, poll_table *p)
...@@ -1453,7 +1453,7 @@ static inline void sock_poll_wait(struct file *filp, ...@@ -1453,7 +1453,7 @@ static inline void sock_poll_wait(struct file *filp,
* We need to be sure we are in sync with the * We need to be sure we are in sync with the
* socket flags modification. * socket flags modification.
* *
* This memory barrier is paired in the sk_has_sleeper. * This memory barrier is paired in the wq_has_sleeper.
*/ */
smp_mb(); smp_mb();
} }
......
...@@ -90,10 +90,13 @@ static void vcc_sock_destruct(struct sock *sk) ...@@ -90,10 +90,13 @@ static void vcc_sock_destruct(struct sock *sk)
static void vcc_def_wakeup(struct sock *sk) static void vcc_def_wakeup(struct sock *sk)
{ {
read_lock(&sk->sk_callback_lock); struct socket_wq *wq;
if (sk_has_sleeper(sk))
wake_up(sk_sleep(sk)); rcu_read_lock();
read_unlock(&sk->sk_callback_lock); wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up(&wq->wait);
rcu_read_unlock();
} }
static inline int vcc_writable(struct sock *sk) static inline int vcc_writable(struct sock *sk)
...@@ -106,16 +109,19 @@ static inline int vcc_writable(struct sock *sk) ...@@ -106,16 +109,19 @@ static inline int vcc_writable(struct sock *sk)
static void vcc_write_space(struct sock *sk) static void vcc_write_space(struct sock *sk)
{ {
read_lock(&sk->sk_callback_lock); struct socket_wq *wq;
rcu_read_lock();
if (vcc_writable(sk)) { if (vcc_writable(sk)) {
if (sk_has_sleeper(sk)) wq = rcu_dereference(sk->sk_wq);
wake_up_interruptible(sk_sleep(sk)); if (wq_has_sleeper(wq))
wake_up_interruptible(&wq->wait);
sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT); sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
} }
read_unlock(&sk->sk_callback_lock); rcu_read_unlock();
} }
static struct proto vcc_proto = { static struct proto vcc_proto = {
......
...@@ -1211,7 +1211,7 @@ struct sock *sk_clone(const struct sock *sk, const gfp_t priority) ...@@ -1211,7 +1211,7 @@ struct sock *sk_clone(const struct sock *sk, const gfp_t priority)
*/ */
sk_refcnt_debug_inc(newsk); sk_refcnt_debug_inc(newsk);
sk_set_socket(newsk, NULL); sk_set_socket(newsk, NULL);
newsk->sk_sleep = NULL; newsk->sk_wq = NULL;
if (newsk->sk_prot->sockets_allocated) if (newsk->sk_prot->sockets_allocated)
percpu_counter_inc(newsk->sk_prot->sockets_allocated); percpu_counter_inc(newsk->sk_prot->sockets_allocated);
...@@ -1800,41 +1800,53 @@ EXPORT_SYMBOL(sock_no_sendpage); ...@@ -1800,41 +1800,53 @@ EXPORT_SYMBOL(sock_no_sendpage);
static void sock_def_wakeup(struct sock *sk) static void sock_def_wakeup(struct sock *sk)
{ {
read_lock(&sk->sk_callback_lock); struct socket_wq *wq;
if (sk_has_sleeper(sk))
wake_up_interruptible_all(sk_sleep(sk)); rcu_read_lock();
read_unlock(&sk->sk_callback_lock); wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_all(&wq->wait);
rcu_read_unlock();
} }
static void sock_def_error_report(struct sock *sk) static void sock_def_error_report(struct sock *sk)
{ {
read_lock(&sk->sk_callback_lock); struct socket_wq *wq;
if (sk_has_sleeper(sk))
wake_up_interruptible_poll(sk_sleep(sk), POLLERR); rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_poll(&wq->wait, POLLERR);
sk_wake_async(sk, SOCK_WAKE_IO, POLL_ERR); sk_wake_async(sk, SOCK_WAKE_IO, POLL_ERR);
read_unlock(&sk->sk_callback_lock); rcu_read_unlock();
} }
static void sock_def_readable(struct sock *sk, int len) static void sock_def_readable(struct sock *sk, int len)
{ {
read_lock(&sk->sk_callback_lock); struct socket_wq *wq;
if (sk_has_sleeper(sk))
wake_up_interruptible_sync_poll(sk_sleep(sk), POLLIN | rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
POLLRDNORM | POLLRDBAND); POLLRDNORM | POLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
read_unlock(&sk->sk_callback_lock); rcu_read_unlock();
} }
static void sock_def_write_space(struct sock *sk) static void sock_def_write_space(struct sock *sk)
{ {
read_lock(&sk->sk_callback_lock); struct socket_wq *wq;
rcu_read_lock();
/* Do not wake up a writer until he can make "significant" /* Do not wake up a writer until he can make "significant"
* progress. --DaveM * progress. --DaveM
*/ */
if ((atomic_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf) { if ((atomic_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf) {
if (sk_has_sleeper(sk)) wq = rcu_dereference(sk->sk_wq);
wake_up_interruptible_sync_poll(sk_sleep(sk), POLLOUT | if (wq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
POLLWRNORM | POLLWRBAND); POLLWRNORM | POLLWRBAND);
/* Should agree with poll, otherwise some programs break */ /* Should agree with poll, otherwise some programs break */
...@@ -1842,7 +1854,7 @@ static void sock_def_write_space(struct sock *sk) ...@@ -1842,7 +1854,7 @@ static void sock_def_write_space(struct sock *sk)
sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT); sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
} }
read_unlock(&sk->sk_callback_lock); rcu_read_unlock();
} }
static void sock_def_destruct(struct sock *sk) static void sock_def_destruct(struct sock *sk)
...@@ -1896,10 +1908,10 @@ void sock_init_data(struct socket *sock, struct sock *sk) ...@@ -1896,10 +1908,10 @@ void sock_init_data(struct socket *sock, struct sock *sk)
if (sock) { if (sock) {
sk->sk_type = sock->type; sk->sk_type = sock->type;
sk->sk_sleep = &sock->wait; sk->sk_wq = sock->wq;
sock->sk = sk; sock->sk = sk;
} else } else
sk->sk_sleep = NULL; sk->sk_wq = NULL;
spin_lock_init(&sk->sk_dst_lock); spin_lock_init(&sk->sk_dst_lock);
rwlock_init(&sk->sk_callback_lock); rwlock_init(&sk->sk_callback_lock);
......
...@@ -28,15 +28,19 @@ ...@@ -28,15 +28,19 @@
void sk_stream_write_space(struct sock *sk) void sk_stream_write_space(struct sock *sk)
{ {
struct socket *sock = sk->sk_socket; struct socket *sock = sk->sk_socket;
struct socket_wq *wq;
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk) && sock) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk) && sock) {
clear_bit(SOCK_NOSPACE, &sock->flags); clear_bit(SOCK_NOSPACE, &sock->flags);
if (sk_sleep(sk) && waitqueue_active(sk_sleep(sk))) rcu_read_lock();
wake_up_interruptible_poll(sk_sleep(sk), POLLOUT | wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_poll(&wq->wait, POLLOUT |
POLLWRNORM | POLLWRBAND); POLLWRNORM | POLLWRBAND);
if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN)) if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
sock_wake_async(sock, SOCK_WAKE_SPACE, POLL_OUT); sock_wake_async(sock, SOCK_WAKE_SPACE, POLL_OUT);
rcu_read_unlock();
} }
} }
......
...@@ -195,15 +195,17 @@ EXPORT_SYMBOL_GPL(dccp_sync_mss); ...@@ -195,15 +195,17 @@ EXPORT_SYMBOL_GPL(dccp_sync_mss);
void dccp_write_space(struct sock *sk) void dccp_write_space(struct sock *sk)
{ {
read_lock(&sk->sk_callback_lock); struct socket_wq *wq;
if (sk_has_sleeper(sk)) rcu_read_lock();
wake_up_interruptible(sk_sleep(sk)); wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible(&wq->wait);
/* Should agree with poll, otherwise some programs break */ /* Should agree with poll, otherwise some programs break */
if (sock_writeable(sk)) if (sock_writeable(sk))
sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT); sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
read_unlock(&sk->sk_callback_lock); rcu_read_unlock();
} }
/** /**
......
...@@ -305,11 +305,14 @@ static inline int iucv_below_msglim(struct sock *sk) ...@@ -305,11 +305,14 @@ static inline int iucv_below_msglim(struct sock *sk)
*/ */
static void iucv_sock_wake_msglim(struct sock *sk) static void iucv_sock_wake_msglim(struct sock *sk)
{ {
read_lock(&sk->sk_callback_lock); struct socket_wq *wq;
if (sk_has_sleeper(sk))
wake_up_interruptible_all(sk_sleep(sk)); rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_all(&wq->wait);
sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT); sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
read_unlock(&sk->sk_callback_lock); rcu_read_unlock();
} }
/* Timers */ /* Timers */
......
...@@ -664,12 +664,12 @@ static int pep_wait_connreq(struct sock *sk, int noblock) ...@@ -664,12 +664,12 @@ static int pep_wait_connreq(struct sock *sk, int noblock)
if (signal_pending(tsk)) if (signal_pending(tsk))
return sock_intr_errno(timeo); return sock_intr_errno(timeo);
prepare_to_wait_exclusive(&sk->sk_socket->wait, &wait, prepare_to_wait_exclusive(sk_sleep(sk), &wait,
TASK_INTERRUPTIBLE); TASK_INTERRUPTIBLE);
release_sock(sk); release_sock(sk);
timeo = schedule_timeout(timeo); timeo = schedule_timeout(timeo);
lock_sock(sk); lock_sock(sk);
finish_wait(&sk->sk_socket->wait, &wait); finish_wait(sk_sleep(sk), &wait);
} }
return 0; return 0;
...@@ -910,10 +910,10 @@ static int pep_sendmsg(struct kiocb *iocb, struct sock *sk, ...@@ -910,10 +910,10 @@ static int pep_sendmsg(struct kiocb *iocb, struct sock *sk,
goto out; goto out;
} }
prepare_to_wait(&sk->sk_socket->wait, &wait, prepare_to_wait(sk_sleep(sk), &wait,
TASK_INTERRUPTIBLE); TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, &timeo, atomic_read(&pn->tx_credits)); done = sk_wait_event(sk, &timeo, atomic_read(&pn->tx_credits));
finish_wait(&sk->sk_socket->wait, &wait); finish_wait(sk_sleep(sk), &wait);
if (sk->sk_state != TCP_ESTABLISHED) if (sk->sk_state != TCP_ESTABLISHED)
goto disabled; goto disabled;
......
...@@ -265,7 +265,7 @@ static unsigned int pn_socket_poll(struct file *file, struct socket *sock, ...@@ -265,7 +265,7 @@ static unsigned int pn_socket_poll(struct file *file, struct socket *sock,
struct pep_sock *pn = pep_sk(sk); struct pep_sock *pn = pep_sk(sk);
unsigned int mask = 0; unsigned int mask = 0;
poll_wait(file, &sock->wait, wait); poll_wait(file, sk_sleep(sk), wait);
switch (sk->sk_state) { switch (sk->sk_state) {
case TCP_LISTEN: case TCP_LISTEN:
......
...@@ -62,13 +62,15 @@ static inline int rxrpc_writable(struct sock *sk) ...@@ -62,13 +62,15 @@ static inline int rxrpc_writable(struct sock *sk)
static void rxrpc_write_space(struct sock *sk) static void rxrpc_write_space(struct sock *sk)
{ {
_enter("%p", sk); _enter("%p", sk);
read_lock(&sk->sk_callback_lock); rcu_read_lock();
if (rxrpc_writable(sk)) { if (rxrpc_writable(sk)) {
if (sk_has_sleeper(sk)) struct socket_wq *wq = rcu_dereference(sk->sk_wq);
wake_up_interruptible(sk_sleep(sk));
if (wq_has_sleeper(wq))
wake_up_interruptible(&wq->wait);
sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT); sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
} }
read_unlock(&sk->sk_callback_lock); rcu_read_unlock();
} }
/* /*
......
...@@ -6065,7 +6065,7 @@ static void __sctp_write_space(struct sctp_association *asoc) ...@@ -6065,7 +6065,7 @@ static void __sctp_write_space(struct sctp_association *asoc)
* here by modeling from the current TCP/UDP code. * here by modeling from the current TCP/UDP code.
* We have not tested with it yet. * We have not tested with it yet.
*/ */
if (sock->fasync_list && if (sock->wq->fasync_list &&
!(sk->sk_shutdown & SEND_SHUTDOWN)) !(sk->sk_shutdown & SEND_SHUTDOWN))
sock_wake_async(sock, sock_wake_async(sock,
SOCK_WAKE_SPACE, POLL_OUT); SOCK_WAKE_SPACE, POLL_OUT);
......
...@@ -252,9 +252,14 @@ static struct inode *sock_alloc_inode(struct super_block *sb) ...@@ -252,9 +252,14 @@ static struct inode *sock_alloc_inode(struct super_block *sb)
ei = kmem_cache_alloc(sock_inode_cachep, GFP_KERNEL); ei = kmem_cache_alloc(sock_inode_cachep, GFP_KERNEL);
if (!ei) if (!ei)
return NULL; return NULL;
init_waitqueue_head(&ei->socket.wait); ei->socket.wq = kmalloc(sizeof(struct socket_wq), GFP_KERNEL);
if (!ei->socket.wq) {
kmem_cache_free(sock_inode_cachep, ei);
return NULL;
}
init_waitqueue_head(&ei->socket.wq->wait);
ei->socket.wq->fasync_list = NULL;
ei->socket.fasync_list = NULL;
ei->socket.state = SS_UNCONNECTED; ei->socket.state = SS_UNCONNECTED;
ei->socket.flags = 0; ei->socket.flags = 0;
ei->socket.ops = NULL; ei->socket.ops = NULL;
...@@ -264,10 +269,21 @@ static struct inode *sock_alloc_inode(struct super_block *sb) ...@@ -264,10 +269,21 @@ static struct inode *sock_alloc_inode(struct super_block *sb)
return &ei->vfs_inode; return &ei->vfs_inode;
} }
static void wq_free_rcu(struct rcu_head *head)
{
struct socket_wq *wq = container_of(head, struct socket_wq, rcu);
kfree(wq);
}
static void sock_destroy_inode(struct inode *inode) static void sock_destroy_inode(struct inode *inode)
{ {
kmem_cache_free(sock_inode_cachep, struct socket_alloc *ei;
container_of(inode, struct socket_alloc, vfs_inode));
ei = container_of(inode, struct socket_alloc, vfs_inode);
call_rcu(&ei->socket.wq->rcu, wq_free_rcu);
kmem_cache_free(sock_inode_cachep, ei);
} }
static void init_once(void *foo) static void init_once(void *foo)
...@@ -513,7 +529,7 @@ void sock_release(struct socket *sock) ...@@ -513,7 +529,7 @@ void sock_release(struct socket *sock)
module_put(owner); module_put(owner);
} }
if (sock->fasync_list) if (sock->wq->fasync_list)
printk(KERN_ERR "sock_release: fasync list not empty!\n"); printk(KERN_ERR "sock_release: fasync list not empty!\n");
percpu_sub(sockets_in_use, 1); percpu_sub(sockets_in_use, 1);
...@@ -1080,9 +1096,9 @@ static int sock_fasync(int fd, struct file *filp, int on) ...@@ -1080,9 +1096,9 @@ static int sock_fasync(int fd, struct file *filp, int on)
lock_sock(sk); lock_sock(sk);
fasync_helper(fd, filp, on, &sock->fasync_list); fasync_helper(fd, filp, on, &sock->wq->fasync_list);
if (!sock->fasync_list) if (!sock->wq->fasync_list)
sock_reset_flag(sk, SOCK_FASYNC); sock_reset_flag(sk, SOCK_FASYNC);
else else
sock_set_flag(sk, SOCK_FASYNC); sock_set_flag(sk, SOCK_FASYNC);
...@@ -1091,12 +1107,20 @@ static int sock_fasync(int fd, struct file *filp, int on) ...@@ -1091,12 +1107,20 @@ static int sock_fasync(int fd, struct file *filp, int on)
return 0; return 0;
} }
/* This function may be called only under socket lock or callback_lock */ /* This function may be called only under socket lock or callback_lock or rcu_lock */
int sock_wake_async(struct socket *sock, int how, int band) int sock_wake_async(struct socket *sock, int how, int band)
{ {
if (!sock || !sock->fasync_list) struct socket_wq *wq;
if (!sock)
return -1;
rcu_read_lock();
wq = rcu_dereference(sock->wq);
if (!wq || !wq->fasync_list) {
rcu_read_unlock();
return -1; return -1;
}
switch (how) { switch (how) {
case SOCK_WAKE_WAITD: case SOCK_WAKE_WAITD:
if (test_bit(SOCK_ASYNC_WAITDATA, &sock->flags)) if (test_bit(SOCK_ASYNC_WAITDATA, &sock->flags))
...@@ -1108,11 +1132,12 @@ int sock_wake_async(struct socket *sock, int how, int band) ...@@ -1108,11 +1132,12 @@ int sock_wake_async(struct socket *sock, int how, int band)
/* fall through */ /* fall through */
case SOCK_WAKE_IO: case SOCK_WAKE_IO:
call_kill: call_kill:
kill_fasync(&sock->fasync_list, SIGIO, band); kill_fasync(&wq->fasync_list, SIGIO, band);
break; break;
case SOCK_WAKE_URG: case SOCK_WAKE_URG:
kill_fasync(&sock->fasync_list, SIGURG, band); kill_fasync(&wq->fasync_list, SIGURG, band);
} }
rcu_read_unlock();
return 0; return 0;
} }
......
...@@ -313,13 +313,16 @@ static inline int unix_writable(struct sock *sk) ...@@ -313,13 +313,16 @@ static inline int unix_writable(struct sock *sk)
static void unix_write_space(struct sock *sk) static void unix_write_space(struct sock *sk)
{ {
read_lock(&sk->sk_callback_lock); struct socket_wq *wq;
rcu_read_lock();
if (unix_writable(sk)) { if (unix_writable(sk)) {
if (sk_has_sleeper(sk)) wq = rcu_dereference(sk->sk_wq);
wake_up_interruptible_sync(sk_sleep(sk)); if (wq_has_sleeper(wq))
wake_up_interruptible_sync(&wq->wait);
sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT); sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
} }
read_unlock(&sk->sk_callback_lock); rcu_read_unlock();
} }
/* When dgram socket disconnects (or changes its peer), we clear its receive /* When dgram socket disconnects (or changes its peer), we clear its receive
...@@ -406,9 +409,7 @@ static int unix_release_sock(struct sock *sk, int embrion) ...@@ -406,9 +409,7 @@ static int unix_release_sock(struct sock *sk, int embrion)
skpair->sk_err = ECONNRESET; skpair->sk_err = ECONNRESET;
unix_state_unlock(skpair); unix_state_unlock(skpair);
skpair->sk_state_change(skpair); skpair->sk_state_change(skpair);
read_lock(&skpair->sk_callback_lock);
sk_wake_async(skpair, SOCK_WAKE_WAITD, POLL_HUP); sk_wake_async(skpair, SOCK_WAKE_WAITD, POLL_HUP);
read_unlock(&skpair->sk_callback_lock);
} }
sock_put(skpair); /* It may now die */ sock_put(skpair); /* It may now die */
unix_peer(sk) = NULL; unix_peer(sk) = NULL;
...@@ -1142,7 +1143,7 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr, ...@@ -1142,7 +1143,7 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
newsk->sk_peercred.pid = task_tgid_vnr(current); newsk->sk_peercred.pid = task_tgid_vnr(current);
current_euid_egid(&newsk->sk_peercred.uid, &newsk->sk_peercred.gid); current_euid_egid(&newsk->sk_peercred.uid, &newsk->sk_peercred.gid);
newu = unix_sk(newsk); newu = unix_sk(newsk);
newsk->sk_sleep = &newu->peer_wait; newsk->sk_wq = &newu->peer_wq;
otheru = unix_sk(other); otheru = unix_sk(other);
/* copy address information from listening to new sock*/ /* copy address information from listening to new sock*/
...@@ -1931,12 +1932,10 @@ static int unix_shutdown(struct socket *sock, int mode) ...@@ -1931,12 +1932,10 @@ static int unix_shutdown(struct socket *sock, int mode)
other->sk_shutdown |= peer_mode; other->sk_shutdown |= peer_mode;
unix_state_unlock(other); unix_state_unlock(other);
other->sk_state_change(other); other->sk_state_change(other);
read_lock(&other->sk_callback_lock);
if (peer_mode == SHUTDOWN_MASK) if (peer_mode == SHUTDOWN_MASK)
sk_wake_async(other, SOCK_WAKE_WAITD, POLL_HUP); sk_wake_async(other, SOCK_WAKE_WAITD, POLL_HUP);
else if (peer_mode & RCV_SHUTDOWN) else if (peer_mode & RCV_SHUTDOWN)
sk_wake_async(other, SOCK_WAKE_WAITD, POLL_IN); sk_wake_async(other, SOCK_WAKE_WAITD, POLL_IN);
read_unlock(&other->sk_callback_lock);
} }
if (other) if (other)
sock_put(other); sock_put(other);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment