Commit 3db6e0d1 authored by Sowmini Varadhan's avatar Sowmini Varadhan Committed by David S. Miller

rds: use RCU to synchronize work-enqueue with connection teardown

rds_sendmsg() can enqueue work on cp_send_w from process context, but
it should not enqueue this work if connection teardown  has commenced
(else we risk enquing work after rds_conn_path_destroy() has assumed that
all work has been cancelled/flushed).

Similarly some other functions like rds_cong_queue_updates
and rds_tcp_data_ready are called in softirq context, and may end
up enqueuing work on rds_wq after rds_conn_path_destroy() has assumed
that all workqs are quiesced.

Check the RDS_DESTROY_PENDING bit and use rcu synchronization to avoid
all these races.
Signed-off-by: default avatarSowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: default avatarSantosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent c90ecbfa
...@@ -219,7 +219,11 @@ void rds_cong_queue_updates(struct rds_cong_map *map) ...@@ -219,7 +219,11 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
spin_lock_irqsave(&rds_cong_lock, flags); spin_lock_irqsave(&rds_cong_lock, flags);
list_for_each_entry(conn, &map->m_conn_list, c_map_item) { list_for_each_entry(conn, &map->m_conn_list, c_map_item) {
if (!test_and_set_bit(0, &conn->c_map_queued)) { struct rds_conn_path *cp = &conn->c_path[0];
rcu_read_lock();
if (!test_and_set_bit(0, &conn->c_map_queued) &&
!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
rds_stats_inc(s_cong_update_queued); rds_stats_inc(s_cong_update_queued);
/* We cannot inline the call to rds_send_xmit() here /* We cannot inline the call to rds_send_xmit() here
* for two reasons (both pertaining to a TCP transport): * for two reasons (both pertaining to a TCP transport):
...@@ -235,9 +239,9 @@ void rds_cong_queue_updates(struct rds_cong_map *map) ...@@ -235,9 +239,9 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
* therefore trigger warnings. * therefore trigger warnings.
* Defer the xmit to rds_send_worker() instead. * Defer the xmit to rds_send_worker() instead.
*/ */
queue_delayed_work(rds_wq, queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
&conn->c_path[0].cp_send_w, 0);
} }
rcu_read_unlock();
} }
spin_unlock_irqrestore(&rds_cong_lock, flags); spin_unlock_irqrestore(&rds_cong_lock, flags);
......
...@@ -366,8 +366,6 @@ void rds_conn_shutdown(struct rds_conn_path *cp) ...@@ -366,8 +366,6 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
* to the conn hash, so we never trigger a reconnect on this * to the conn hash, so we never trigger a reconnect on this
* conn - the reconnect is always triggered by the active peer. */ * conn - the reconnect is always triggered by the active peer. */
cancel_delayed_work_sync(&cp->cp_conn_w); cancel_delayed_work_sync(&cp->cp_conn_w);
if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
return;
rcu_read_lock(); rcu_read_lock();
if (!hlist_unhashed(&conn->c_hash_node)) { if (!hlist_unhashed(&conn->c_hash_node)) {
rcu_read_unlock(); rcu_read_unlock();
...@@ -390,6 +388,7 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp) ...@@ -390,6 +388,7 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
return; return;
/* make sure lingering queued work won't try to ref the conn */ /* make sure lingering queued work won't try to ref the conn */
synchronize_rcu();
cancel_delayed_work_sync(&cp->cp_send_w); cancel_delayed_work_sync(&cp->cp_send_w);
cancel_delayed_work_sync(&cp->cp_recv_w); cancel_delayed_work_sync(&cp->cp_recv_w);
...@@ -407,6 +406,11 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp) ...@@ -407,6 +406,11 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
if (cp->cp_xmit_rm) if (cp->cp_xmit_rm)
rds_message_put(cp->cp_xmit_rm); rds_message_put(cp->cp_xmit_rm);
WARN_ON(delayed_work_pending(&cp->cp_send_w));
WARN_ON(delayed_work_pending(&cp->cp_recv_w));
WARN_ON(delayed_work_pending(&cp->cp_conn_w));
WARN_ON(work_pending(&cp->cp_down_w));
cp->cp_conn->c_trans->conn_free(cp->cp_transport_data); cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
} }
...@@ -686,10 +690,13 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy) ...@@ -686,10 +690,13 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
{ {
atomic_set(&cp->cp_state, RDS_CONN_ERROR); atomic_set(&cp->cp_state, RDS_CONN_ERROR);
if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) rcu_read_lock();
if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
rcu_read_unlock();
return; return;
}
queue_work(rds_wq, &cp->cp_down_w); queue_work(rds_wq, &cp->cp_down_w);
rcu_read_unlock();
} }
EXPORT_SYMBOL_GPL(rds_conn_path_drop); EXPORT_SYMBOL_GPL(rds_conn_path_drop);
...@@ -706,9 +713,15 @@ EXPORT_SYMBOL_GPL(rds_conn_drop); ...@@ -706,9 +713,15 @@ EXPORT_SYMBOL_GPL(rds_conn_drop);
*/ */
void rds_conn_path_connect_if_down(struct rds_conn_path *cp) void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
{ {
rcu_read_lock();
if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
rcu_read_unlock();
return;
}
if (rds_conn_path_state(cp) == RDS_CONN_DOWN && if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
!test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags)) !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
rcu_read_unlock();
} }
EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down); EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
......
...@@ -162,6 +162,12 @@ int rds_send_xmit(struct rds_conn_path *cp) ...@@ -162,6 +162,12 @@ int rds_send_xmit(struct rds_conn_path *cp)
goto out; goto out;
} }
if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
release_in_xmit(cp);
ret = -ENETUNREACH; /* dont requeue send work */
goto out;
}
/* /*
* we record the send generation after doing the xmit acquire. * we record the send generation after doing the xmit acquire.
* if someone else manages to jump in and do some work, we'll use * if someone else manages to jump in and do some work, we'll use
...@@ -437,7 +443,12 @@ int rds_send_xmit(struct rds_conn_path *cp) ...@@ -437,7 +443,12 @@ int rds_send_xmit(struct rds_conn_path *cp)
!list_empty(&cp->cp_send_queue)) && !raced) { !list_empty(&cp->cp_send_queue)) && !raced) {
if (batch_count < send_batch_count) if (batch_count < send_batch_count)
goto restart; goto restart;
queue_delayed_work(rds_wq, &cp->cp_send_w, 1); rcu_read_lock();
if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
ret = -ENETUNREACH;
else
queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
rcu_read_unlock();
} else if (raced) { } else if (raced) {
rds_stats_inc(s_send_lock_queue_raced); rds_stats_inc(s_send_lock_queue_raced);
} }
...@@ -1151,6 +1162,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) ...@@ -1151,6 +1162,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
else else
cpath = &conn->c_path[0]; cpath = &conn->c_path[0];
if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) {
ret = -EAGAIN;
goto out;
}
rds_conn_path_connect_if_down(cpath); rds_conn_path_connect_if_down(cpath);
ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
...@@ -1190,9 +1206,17 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) ...@@ -1190,9 +1206,17 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
rds_stats_inc(s_send_queued); rds_stats_inc(s_send_queued);
ret = rds_send_xmit(cpath); ret = rds_send_xmit(cpath);
if (ret == -ENOMEM || ret == -EAGAIN) if (ret == -ENOMEM || ret == -EAGAIN) {
queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); ret = 0;
rcu_read_lock();
if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags))
ret = -ENETUNREACH;
else
queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
rcu_read_unlock();
}
if (ret)
goto out;
rds_message_put(rm); rds_message_put(rm);
return payload_len; return payload_len;
...@@ -1270,7 +1294,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport, ...@@ -1270,7 +1294,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
rds_stats_inc(s_send_pong); rds_stats_inc(s_send_pong);
/* schedule the send work on rds_wq */ /* schedule the send work on rds_wq */
queue_delayed_work(rds_wq, &cp->cp_send_w, 1); rcu_read_lock();
if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
rcu_read_unlock();
rds_message_put(rm); rds_message_put(rm);
return 0; return 0;
......
...@@ -321,8 +321,12 @@ void rds_tcp_data_ready(struct sock *sk) ...@@ -321,8 +321,12 @@ void rds_tcp_data_ready(struct sock *sk)
ready = tc->t_orig_data_ready; ready = tc->t_orig_data_ready;
rds_tcp_stats_inc(s_tcp_data_ready_calls); rds_tcp_stats_inc(s_tcp_data_ready_calls);
if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); rcu_read_lock();
if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
rcu_read_unlock();
}
out: out:
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
ready(sk); ready(sk);
......
...@@ -202,8 +202,11 @@ void rds_tcp_write_space(struct sock *sk) ...@@ -202,8 +202,11 @@ void rds_tcp_write_space(struct sock *sk)
tc->t_last_seen_una = rds_tcp_snd_una(tc); tc->t_last_seen_una = rds_tcp_snd_una(tc);
rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked); rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked);
if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf) rcu_read_lock();
if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
queue_delayed_work(rds_wq, &cp->cp_send_w, 0); queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
rcu_read_unlock();
out: out:
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
......
...@@ -87,8 +87,12 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr) ...@@ -87,8 +87,12 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
cp->cp_reconnect_jiffies = 0; cp->cp_reconnect_jiffies = 0;
set_bit(0, &cp->cp_conn->c_map_queued); set_bit(0, &cp->cp_conn->c_map_queued);
queue_delayed_work(rds_wq, &cp->cp_send_w, 0); rcu_read_lock();
queue_delayed_work(rds_wq, &cp->cp_recv_w, 0); if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
}
rcu_read_unlock();
} }
EXPORT_SYMBOL_GPL(rds_connect_path_complete); EXPORT_SYMBOL_GPL(rds_connect_path_complete);
...@@ -133,7 +137,10 @@ void rds_queue_reconnect(struct rds_conn_path *cp) ...@@ -133,7 +137,10 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags); set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
if (cp->cp_reconnect_jiffies == 0) { if (cp->cp_reconnect_jiffies == 0) {
cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies; cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
queue_delayed_work(rds_wq, &cp->cp_conn_w, 0); rcu_read_lock();
if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
rcu_read_unlock();
return; return;
} }
...@@ -141,8 +148,11 @@ void rds_queue_reconnect(struct rds_conn_path *cp) ...@@ -141,8 +148,11 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n", rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies, rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
conn, &conn->c_laddr, &conn->c_faddr); conn, &conn->c_laddr, &conn->c_faddr);
queue_delayed_work(rds_wq, &cp->cp_conn_w, rcu_read_lock();
rand % cp->cp_reconnect_jiffies); if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
queue_delayed_work(rds_wq, &cp->cp_conn_w,
rand % cp->cp_reconnect_jiffies);
rcu_read_unlock();
cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2, cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
rds_sysctl_reconnect_max_jiffies); rds_sysctl_reconnect_max_jiffies);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment