Commit 29a36611 authored by David S. Miller's avatar David S. Miller

Merge branch 'rds-packet-assembly-fixes'

Sowmini Varadhan says:

====================
RDS: TCP: socket locking RDS packet assembly fixes

This three part patchset fixes bugs in synchronization between
rds_tcp_accept_one() and the rds-tcp send/recv path.

Patch 1 ensures that the lock_sock() is taken appropriately
and the RDS datagram reassembly state is reset  to synchronize
with the receive path.

Patch 2 ensures that partially sent RDS datagrams will get
retransmitted after rds_tcp_accept_one() switches sockets.

Patch 3 fixes a race window which would prematurely re-enable
rds_send_xmit() before the rds_tcp_connection setup has been
completed in rds_tcp_accept_one().
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 80e509db 9c79440e
...@@ -74,6 +74,7 @@ enum { ...@@ -74,6 +74,7 @@ enum {
RDS_CONN_CONNECTING, RDS_CONN_CONNECTING,
RDS_CONN_DISCONNECTING, RDS_CONN_DISCONNECTING,
RDS_CONN_UP, RDS_CONN_UP,
RDS_CONN_RESETTING,
RDS_CONN_ERROR, RDS_CONN_ERROR,
}; };
...@@ -813,6 +814,7 @@ void rds_connect_worker(struct work_struct *); ...@@ -813,6 +814,7 @@ void rds_connect_worker(struct work_struct *);
void rds_shutdown_worker(struct work_struct *); void rds_shutdown_worker(struct work_struct *);
void rds_send_worker(struct work_struct *); void rds_send_worker(struct work_struct *);
void rds_recv_worker(struct work_struct *); void rds_recv_worker(struct work_struct *);
void rds_connect_path_complete(struct rds_connection *conn, int curr);
void rds_connect_complete(struct rds_connection *conn); void rds_connect_complete(struct rds_connection *conn);
/* transport.c */ /* transport.c */
......
...@@ -99,6 +99,7 @@ void rds_send_reset(struct rds_connection *conn) ...@@ -99,6 +99,7 @@ void rds_send_reset(struct rds_connection *conn)
list_splice_init(&conn->c_retrans, &conn->c_send_queue); list_splice_init(&conn->c_retrans, &conn->c_send_queue);
spin_unlock_irqrestore(&conn->c_lock, flags); spin_unlock_irqrestore(&conn->c_lock, flags);
} }
EXPORT_SYMBOL_GPL(rds_send_reset);
static int acquire_in_xmit(struct rds_connection *conn) static int acquire_in_xmit(struct rds_connection *conn)
{ {
......
...@@ -126,9 +126,81 @@ void rds_tcp_restore_callbacks(struct socket *sock, ...@@ -126,9 +126,81 @@ void rds_tcp_restore_callbacks(struct socket *sock,
} }
/* /*
* This is the only path that sets tc->t_sock. Send and receive trust that * rds_tcp_reset_callbacks() switches the to the new sock and
* it is set. The RDS_CONN_UP bit protects those paths from being * returns the existing tc->t_sock.
* called while it isn't set. *
* The only functions that set tc->t_sock are rds_tcp_set_callbacks
* and rds_tcp_reset_callbacks. Send and receive trust that
* it is set. The absence of RDS_CONN_UP bit protects those paths
* from being called while it isn't set.
*/
void rds_tcp_reset_callbacks(struct socket *sock,
struct rds_connection *conn)
{
struct rds_tcp_connection *tc = conn->c_transport_data;
struct socket *osock = tc->t_sock;
if (!osock)
goto newsock;
/* Need to resolve a duelling SYN between peers.
* We have an outstanding SYN to this peer, which may
* potentially have transitioned to the RDS_CONN_UP state,
* so we must quiesce any send threads before resetting
* c_transport_data. We quiesce these threads by setting
* c_state to something other than RDS_CONN_UP, and then
* waiting for any existing threads in rds_send_xmit to
* complete release_in_xmit(). (Subsequent threads entering
* rds_send_xmit() will bail on !rds_conn_up().
*
* However an incoming syn-ack at this point would end up
* marking the conn as RDS_CONN_UP, and would again permit
* rds_send_xmi() threads through, so ideally we would
* synchronize on RDS_CONN_UP after lock_sock(), but cannot
* do that: waiting on !RDS_IN_XMIT after lock_sock() may
* end up deadlocking with tcp_sendmsg(), and the RDS_IN_XMIT
* would not get set. As a result, we set c_state to
* RDS_CONN_RESETTTING, to ensure that rds_tcp_state_change
* cannot mark rds_conn_path_up() in the window before lock_sock()
*/
atomic_set(&conn->c_state, RDS_CONN_RESETTING);
wait_event(conn->c_waitq, !test_bit(RDS_IN_XMIT, &conn->c_flags));
lock_sock(osock->sk);
/* reset receive side state for rds_tcp_data_recv() for osock */
if (tc->t_tinc) {
rds_inc_put(&tc->t_tinc->ti_inc);
tc->t_tinc = NULL;
}
tc->t_tinc_hdr_rem = sizeof(struct rds_header);
tc->t_tinc_data_rem = 0;
tc->t_sock = NULL;
write_lock_bh(&osock->sk->sk_callback_lock);
osock->sk->sk_user_data = NULL;
osock->sk->sk_data_ready = tc->t_orig_data_ready;
osock->sk->sk_write_space = tc->t_orig_write_space;
osock->sk->sk_state_change = tc->t_orig_state_change;
write_unlock_bh(&osock->sk->sk_callback_lock);
release_sock(osock->sk);
sock_release(osock);
newsock:
rds_send_reset(conn);
lock_sock(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock);
tc->t_sock = sock;
sock->sk->sk_user_data = conn;
sock->sk->sk_data_ready = rds_tcp_data_ready;
sock->sk->sk_write_space = rds_tcp_write_space;
sock->sk->sk_state_change = rds_tcp_state_change;
write_unlock_bh(&sock->sk->sk_callback_lock);
release_sock(sock->sk);
}
/* Add tc to rds_tcp_tc_list and set tc->t_sock. See comments
* above rds_tcp_reset_callbacks for notes about synchronization
* with data path
*/ */
void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn) void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn)
{ {
......
...@@ -50,6 +50,7 @@ struct rds_tcp_statistics { ...@@ -50,6 +50,7 @@ struct rds_tcp_statistics {
void rds_tcp_tune(struct socket *sock); void rds_tcp_tune(struct socket *sock);
void rds_tcp_nonagle(struct socket *sock); void rds_tcp_nonagle(struct socket *sock);
void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn); void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn);
void rds_tcp_reset_callbacks(struct socket *sock, struct rds_connection *conn);
void rds_tcp_restore_callbacks(struct socket *sock, void rds_tcp_restore_callbacks(struct socket *sock,
struct rds_tcp_connection *tc); struct rds_tcp_connection *tc);
u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc); u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc);
......
...@@ -60,7 +60,7 @@ void rds_tcp_state_change(struct sock *sk) ...@@ -60,7 +60,7 @@ void rds_tcp_state_change(struct sock *sk)
case TCP_SYN_RECV: case TCP_SYN_RECV:
break; break;
case TCP_ESTABLISHED: case TCP_ESTABLISHED:
rds_connect_complete(conn); rds_connect_path_complete(conn, RDS_CONN_CONNECTING);
break; break;
case TCP_CLOSE_WAIT: case TCP_CLOSE_WAIT:
case TCP_CLOSE: case TCP_CLOSE:
......
...@@ -78,7 +78,6 @@ int rds_tcp_accept_one(struct socket *sock) ...@@ -78,7 +78,6 @@ int rds_tcp_accept_one(struct socket *sock)
struct inet_sock *inet; struct inet_sock *inet;
struct rds_tcp_connection *rs_tcp = NULL; struct rds_tcp_connection *rs_tcp = NULL;
int conn_state; int conn_state;
struct sock *nsk;
if (!sock) /* module unload or netns delete in progress */ if (!sock) /* module unload or netns delete in progress */
return -ENETUNREACH; return -ENETUNREACH;
...@@ -136,26 +135,21 @@ int rds_tcp_accept_one(struct socket *sock) ...@@ -136,26 +135,21 @@ int rds_tcp_accept_one(struct socket *sock)
!conn->c_outgoing) { !conn->c_outgoing) {
goto rst_nsk; goto rst_nsk;
} else { } else {
atomic_set(&conn->c_state, RDS_CONN_CONNECTING); rds_tcp_reset_callbacks(new_sock, conn);
wait_event(conn->c_waitq,
!test_bit(RDS_IN_XMIT, &conn->c_flags));
rds_tcp_restore_callbacks(rs_tcp->t_sock, rs_tcp);
conn->c_outgoing = 0; conn->c_outgoing = 0;
/* rds_connect_path_complete() marks RDS_CONN_UP */
rds_connect_path_complete(conn, RDS_CONN_DISCONNECTING);
} }
} } else {
rds_tcp_set_callbacks(new_sock, conn); rds_tcp_set_callbacks(new_sock, conn);
rds_connect_complete(conn); /* marks RDS_CONN_UP */ rds_connect_path_complete(conn, RDS_CONN_CONNECTING);
}
new_sock = NULL; new_sock = NULL;
ret = 0; ret = 0;
goto out; goto out;
rst_nsk: rst_nsk:
/* reset the newly returned accept sock and bail */ /* reset the newly returned accept sock and bail */
nsk = new_sock->sk; kernel_sock_shutdown(new_sock, SHUT_RDWR);
rds_tcp_stats_inc(s_tcp_listen_closed_stale);
nsk->sk_user_data = NULL;
nsk->sk_prot->disconnect(nsk, 0);
tcp_done(nsk);
new_sock = NULL;
ret = 0; ret = 0;
out: out:
if (rs_tcp) if (rs_tcp)
......
...@@ -71,9 +71,9 @@ ...@@ -71,9 +71,9 @@
struct workqueue_struct *rds_wq; struct workqueue_struct *rds_wq;
EXPORT_SYMBOL_GPL(rds_wq); EXPORT_SYMBOL_GPL(rds_wq);
void rds_connect_complete(struct rds_connection *conn) void rds_connect_path_complete(struct rds_connection *conn, int curr)
{ {
if (!rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_UP)) { if (!rds_conn_transition(conn, curr, RDS_CONN_UP)) {
printk(KERN_WARNING "%s: Cannot transition to state UP, " printk(KERN_WARNING "%s: Cannot transition to state UP, "
"current state is %d\n", "current state is %d\n",
__func__, __func__,
...@@ -90,6 +90,12 @@ void rds_connect_complete(struct rds_connection *conn) ...@@ -90,6 +90,12 @@ void rds_connect_complete(struct rds_connection *conn)
queue_delayed_work(rds_wq, &conn->c_send_w, 0); queue_delayed_work(rds_wq, &conn->c_send_w, 0);
queue_delayed_work(rds_wq, &conn->c_recv_w, 0); queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
} }
EXPORT_SYMBOL_GPL(rds_connect_path_complete);
void rds_connect_complete(struct rds_connection *conn)
{
rds_connect_path_complete(conn, RDS_CONN_CONNECTING);
}
EXPORT_SYMBOL_GPL(rds_connect_complete); EXPORT_SYMBOL_GPL(rds_connect_complete);
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment