Commit 3f26ec10 authored by Trond Myklebust's avatar Trond Myklebust

Add the sk->callback_lock spinlocks to the RPC socket callbacks

in order to protect the socket from being released by one
CPU while the other is in a soft interrupt.
parent f4084744
...@@ -377,6 +377,7 @@ xprt_close(struct rpc_xprt *xprt) ...@@ -377,6 +377,7 @@ xprt_close(struct rpc_xprt *xprt)
if (!sk) if (!sk)
return; return;
write_lock_bh(&sk->callback_lock);
xprt->inet = NULL; xprt->inet = NULL;
xprt->sock = NULL; xprt->sock = NULL;
...@@ -384,6 +385,7 @@ xprt_close(struct rpc_xprt *xprt) ...@@ -384,6 +385,7 @@ xprt_close(struct rpc_xprt *xprt)
sk->data_ready = xprt->old_data_ready; sk->data_ready = xprt->old_data_ready;
sk->state_change = xprt->old_state_change; sk->state_change = xprt->old_state_change;
sk->write_space = xprt->old_write_space; sk->write_space = xprt->old_write_space;
write_unlock_bh(&sk->callback_lock);
xprt_disconnect(xprt); xprt_disconnect(xprt);
sk->no_check = 0; sk->no_check = 0;
...@@ -683,6 +685,7 @@ udp_data_ready(struct sock *sk, int len) ...@@ -683,6 +685,7 @@ udp_data_ready(struct sock *sk, int len)
struct sk_buff *skb; struct sk_buff *skb;
int err, repsize, copied; int err, repsize, copied;
read_lock(&sk->callback_lock);
dprintk("RPC: udp_data_ready...\n"); dprintk("RPC: udp_data_ready...\n");
if (!(xprt = xprt_from_sock(sk))) { if (!(xprt = xprt_from_sock(sk))) {
printk("RPC: udp_data_ready request not found!\n"); printk("RPC: udp_data_ready request not found!\n");
...@@ -733,6 +736,7 @@ udp_data_ready(struct sock *sk, int len) ...@@ -733,6 +736,7 @@ udp_data_ready(struct sock *sk, int len)
out: out:
if (sk->sleep && waitqueue_active(sk->sleep)) if (sk->sleep && waitqueue_active(sk->sleep))
wake_up_interruptible(sk->sleep); wake_up_interruptible(sk->sleep);
read_unlock(&sk->callback_lock);
} }
/* /*
...@@ -937,18 +941,21 @@ static void tcp_data_ready(struct sock *sk, int bytes) ...@@ -937,18 +941,21 @@ static void tcp_data_ready(struct sock *sk, int bytes)
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
read_descriptor_t rd_desc; read_descriptor_t rd_desc;
read_lock(&sk->callback_lock);
dprintk("RPC: tcp_data_ready...\n"); dprintk("RPC: tcp_data_ready...\n");
if (!(xprt = xprt_from_sock(sk))) { if (!(xprt = xprt_from_sock(sk))) {
printk("RPC: tcp_data_ready socket info not found!\n"); printk("RPC: tcp_data_ready socket info not found!\n");
return; goto out;
} }
if (xprt->shutdown) if (xprt->shutdown)
return; goto out;
/* We use rd_desc to pass struct xprt to tcp_data_recv */ /* We use rd_desc to pass struct xprt to tcp_data_recv */
rd_desc.buf = (char *)xprt; rd_desc.buf = (char *)xprt;
rd_desc.count = 65536; rd_desc.count = 65536;
tcp_read_sock(sk, &rd_desc, tcp_data_recv); tcp_read_sock(sk, &rd_desc, tcp_data_recv);
out:
read_unlock(&sk->callback_lock);
} }
static void static void
...@@ -956,6 +963,7 @@ tcp_state_change(struct sock *sk) ...@@ -956,6 +963,7 @@ tcp_state_change(struct sock *sk)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
read_lock(&sk->callback_lock);
if (!(xprt = xprt_from_sock(sk))) if (!(xprt = xprt_from_sock(sk)))
goto out; goto out;
dprintk("RPC: tcp_state_change client %p...\n", xprt); dprintk("RPC: tcp_state_change client %p...\n", xprt);
...@@ -989,6 +997,7 @@ tcp_state_change(struct sock *sk) ...@@ -989,6 +997,7 @@ tcp_state_change(struct sock *sk)
out: out:
if (sk->sleep && waitqueue_active(sk->sleep)) if (sk->sleep && waitqueue_active(sk->sleep))
wake_up_interruptible_all(sk->sleep); wake_up_interruptible_all(sk->sleep);
read_unlock(&sk->callback_lock);
} }
/* /*
...@@ -1003,24 +1012,25 @@ xprt_write_space(struct sock *sk) ...@@ -1003,24 +1012,25 @@ xprt_write_space(struct sock *sk)
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
struct socket *sock; struct socket *sock;
read_lock(&sk->callback_lock);
if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket)) if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
return; goto out;
if (xprt->shutdown) if (xprt->shutdown)
return; goto out;
/* Wait until we have enough socket memory */ /* Wait until we have enough socket memory */
if (xprt->stream) { if (xprt->stream) {
/* from net/ipv4/tcp.c:tcp_write_space */ /* from net/ipv4/tcp.c:tcp_write_space */
if (tcp_wspace(sk) < tcp_min_write_space(sk)) if (tcp_wspace(sk) < tcp_min_write_space(sk))
return; goto out;
} else { } else {
/* from net/core/sock.c:sock_def_write_space */ /* from net/core/sock.c:sock_def_write_space */
if (!sock_writeable(sk)) if (!sock_writeable(sk))
return; goto out;
} }
if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
return; goto out;
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending) if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
...@@ -1028,6 +1038,8 @@ xprt_write_space(struct sock *sk) ...@@ -1028,6 +1038,8 @@ xprt_write_space(struct sock *sk)
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
if (sk->sleep && waitqueue_active(sk->sleep)) if (sk->sleep && waitqueue_active(sk->sleep))
wake_up_interruptible(sk->sleep); wake_up_interruptible(sk->sleep);
out:
read_unlock(&sk->callback_lock);
} }
/* /*
...@@ -1459,6 +1471,7 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) ...@@ -1459,6 +1471,7 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
if (xprt->inet) if (xprt->inet)
return; return;
write_lock_bh(&sk->callback_lock);
sk->user_data = xprt; sk->user_data = xprt;
xprt->old_data_ready = sk->data_ready; xprt->old_data_ready = sk->data_ready;
xprt->old_state_change = sk->state_change; xprt->old_state_change = sk->state_change;
...@@ -1479,6 +1492,7 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) ...@@ -1479,6 +1492,7 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
/* Reset to new socket */ /* Reset to new socket */
xprt->sock = sock; xprt->sock = sock;
xprt->inet = sk; xprt->inet = sk;
write_unlock_bh(&sk->callback_lock);
return; return;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment