Commit 3cec055c authored by David Howells's avatar David Howells

rxrpc: Don't hold a ref for connection workqueue

Currently, rxrpc gives the connection's work item a ref on the connection
when it queues it - and this is called from the timer expiration function.
The problem comes when queue_work() fails (ie. the work item is already
queued): the timer routine must put the ref - but this may cause the
cleanup code to run.

This has the unfortunate effect that the cleanup code may then be run in
softirq context - which means that any spinlocks it might need to touch
have to be guarded to disable softirqs (ie. they need a "_bh" suffix).

 (1) Don't give a ref to the work item.

 (2) Simplify handling of service connections by adding a separate active
     count so that the refcount isn't also used for this.

 (3) Connection destruction for both client and service connections can
     then be cleaned up by putting rxrpc_put_connection() out of line and
     making a tidy progression through the destruction code (offloaded to a
     workqueue if put from softirq or processor function context).  The RCU
     part of the cleanup then only deals with the freeing at the end.

 (4) Make rxrpc_queue_conn() return immediately if it sees the active count
     is -1 rather then queuing the connection.

 (5) Make sure that the cleanup routine waits for the work item to
     complete.

 (6) Stash the rxrpc_net pointer in the conn struct so that the rcu free
     routine can use it, even if the local endpoint has been freed.

Unfortunately, neither the timer nor the work item can simply get around
the problem by just using refcount_inc_not_zero() as the waits would still
have to be done, and there would still be the possibility of having to put
the ref in the expiration function.

Note the connection work item is mostly going to go away with the main
event work being transferred to the I/O thread, so the wait in (6) will
become obsolete.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent 3feda9d6
......@@ -112,7 +112,6 @@
EM(rxrpc_conn_get_service_conn, "GET svc-conn") \
EM(rxrpc_conn_new_client, "NEW client ") \
EM(rxrpc_conn_new_service, "NEW service ") \
EM(rxrpc_conn_put_already_queued, "PUT alreadyq") \
EM(rxrpc_conn_put_call, "PUT call ") \
EM(rxrpc_conn_put_call_input, "PUT inp-call") \
EM(rxrpc_conn_put_conn_input, "PUT inp-conn") \
......@@ -121,13 +120,13 @@
EM(rxrpc_conn_put_local_dead, "PUT loc-dead") \
EM(rxrpc_conn_put_noreuse, "PUT noreuse ") \
EM(rxrpc_conn_put_poke, "PUT poke ") \
EM(rxrpc_conn_put_service_reaped, "PUT svc-reap") \
EM(rxrpc_conn_put_unbundle, "PUT unbundle") \
EM(rxrpc_conn_put_unidle, "PUT unidle ") \
EM(rxrpc_conn_put_work, "PUT work ") \
EM(rxrpc_conn_queue_challenge, "GQ chall ") \
EM(rxrpc_conn_queue_retry_work, "GQ retry-wk") \
EM(rxrpc_conn_queue_rx_work, "GQ rx-work ") \
EM(rxrpc_conn_queue_timer, "GQ timer ") \
EM(rxrpc_conn_queue_challenge, "QUE chall ") \
EM(rxrpc_conn_queue_retry_work, "QUE retry-wk") \
EM(rxrpc_conn_queue_rx_work, "QUE rx-work ") \
EM(rxrpc_conn_queue_timer, "QUE timer ") \
EM(rxrpc_conn_see_new_service_conn, "SEE new-svc ") \
EM(rxrpc_conn_see_reap_service, "SEE reap-svc") \
E_(rxrpc_conn_see_work, "SEE work ")
......
......@@ -76,7 +76,7 @@ struct rxrpc_net {
bool kill_all_client_conns;
atomic_t nr_client_conns;
spinlock_t client_conn_cache_lock; /* Lock for ->*_client_conns */
spinlock_t client_conn_discard_lock; /* Prevent multiple discarders */
struct mutex client_conn_discard_lock; /* Prevent multiple discarders */
struct list_head idle_client_conns;
struct work_struct client_conn_reaper;
struct timer_list client_conn_reap_timer;
......@@ -432,9 +432,11 @@ struct rxrpc_connection {
struct rxrpc_conn_proto proto;
struct rxrpc_local *local; /* Representation of local endpoint */
struct rxrpc_peer *peer; /* Remote endpoint */
struct rxrpc_net *rxnet; /* Network namespace to which call belongs */
struct key *key; /* Security details */
refcount_t ref;
atomic_t active; /* Active count for service conns */
struct rcu_head rcu;
struct list_head cache_link;
......@@ -455,6 +457,7 @@ struct rxrpc_connection {
struct timer_list timer; /* Conn event timer */
struct work_struct processor; /* connection event processor */
struct work_struct destructor; /* In-process-context destroyer */
struct rxrpc_bundle *bundle; /* Client connection bundle */
struct rb_node service_node; /* Node in peer->service_conns */
struct list_head proc_link; /* link in procfs list */
......@@ -897,20 +900,20 @@ void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
extern unsigned int rxrpc_connection_expiry;
extern unsigned int rxrpc_closed_conn_expiry;
struct rxrpc_connection *rxrpc_alloc_connection(gfp_t);
struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t);
struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
struct sk_buff *,
struct rxrpc_peer **);
void __rxrpc_disconnect_call(struct rxrpc_connection *, struct rxrpc_call *);
void rxrpc_disconnect_call(struct rxrpc_call *);
void rxrpc_kill_connection(struct rxrpc_connection *);
bool rxrpc_queue_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
void rxrpc_kill_client_conn(struct rxrpc_connection *);
void rxrpc_queue_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
void rxrpc_see_connection(struct rxrpc_connection *, enum rxrpc_conn_trace);
struct rxrpc_connection *rxrpc_get_connection(struct rxrpc_connection *,
enum rxrpc_conn_trace);
struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *,
enum rxrpc_conn_trace);
void rxrpc_put_service_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
void rxrpc_put_connection(struct rxrpc_connection *, enum rxrpc_conn_trace);
void rxrpc_service_connection_reaper(struct work_struct *);
void rxrpc_destroy_all_connections(struct rxrpc_net *);
......@@ -924,18 +927,6 @@ static inline bool rxrpc_conn_is_service(const struct rxrpc_connection *conn)
return !rxrpc_conn_is_client(conn);
}
static inline void rxrpc_put_connection(struct rxrpc_connection *conn,
enum rxrpc_conn_trace why)
{
if (!conn)
return;
if (rxrpc_conn_is_client(conn))
rxrpc_put_client_conn(conn, why);
else
rxrpc_put_service_conn(conn, why);
}
static inline void rxrpc_reduce_conn_timer(struct rxrpc_connection *conn,
unsigned long expire_at)
{
......
......@@ -308,6 +308,7 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
rxrpc_new_incoming_connection(rx, conn, sec, skb);
} else {
rxrpc_get_connection(conn, rxrpc_conn_get_service_conn);
atomic_inc(&conn->active);
}
/* And now we can allocate and set up a new call */
......
......@@ -51,7 +51,7 @@ static void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle);
static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
gfp_t gfp)
{
struct rxrpc_net *rxnet = conn->local->rxnet;
struct rxrpc_net *rxnet = conn->rxnet;
int id;
_enter("");
......@@ -179,7 +179,7 @@ rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle, gfp_t gfp)
_enter("");
conn = rxrpc_alloc_connection(gfp);
conn = rxrpc_alloc_connection(rxnet, gfp);
if (!conn) {
_leave(" = -ENOMEM");
return ERR_PTR(-ENOMEM);
......@@ -243,7 +243,7 @@ static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn)
if (!conn)
goto dont_reuse;
rxnet = conn->local->rxnet;
rxnet = conn->rxnet;
if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags))
goto dont_reuse;
......@@ -970,7 +970,7 @@ static void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle)
/*
* Clean up a dead client connection.
*/
static void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
{
struct rxrpc_local *local = conn->local;
struct rxrpc_net *rxnet = local->rxnet;
......@@ -981,23 +981,6 @@ static void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
atomic_dec(&rxnet->nr_client_conns);
rxrpc_put_client_connection_id(conn);
rxrpc_kill_connection(conn);
}
/*
* Clean up a dead client connections.
*/
void rxrpc_put_client_conn(struct rxrpc_connection *conn,
enum rxrpc_conn_trace why)
{
unsigned int debug_id = conn->debug_id;
bool dead;
int r;
dead = __refcount_dec_and_test(&conn->ref, &r);
trace_rxrpc_conn(debug_id, r - 1, why);
if (dead)
rxrpc_kill_client_conn(conn);
}
/*
......@@ -1023,7 +1006,7 @@ void rxrpc_discard_expired_client_conns(struct work_struct *work)
}
/* Don't double up on the discarding */
if (!spin_trylock(&rxnet->client_conn_discard_lock)) {
if (!mutex_trylock(&rxnet->client_conn_discard_lock)) {
_leave(" [already]");
return;
}
......@@ -1061,6 +1044,7 @@ void rxrpc_discard_expired_client_conns(struct work_struct *work)
goto not_yet_expired;
}
atomic_dec(&conn->active);
trace_rxrpc_client(conn, -1, rxrpc_client_discard);
list_del_init(&conn->cache_link);
......@@ -1087,7 +1071,7 @@ void rxrpc_discard_expired_client_conns(struct work_struct *work)
out:
spin_unlock(&rxnet->client_conn_cache_lock);
spin_unlock(&rxnet->client_conn_discard_lock);
mutex_unlock(&rxnet->client_conn_discard_lock);
_leave("");
}
......@@ -1127,6 +1111,7 @@ void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
list_for_each_entry_safe(conn, tmp, &rxnet->idle_client_conns,
cache_link) {
if (conn->local == local) {
atomic_dec(&conn->active);
trace_rxrpc_client(conn, -1, rxrpc_client_discard);
list_move(&conn->cache_link, &graveyard);
}
......
......@@ -478,8 +478,4 @@ void rxrpc_process_connection(struct work_struct *work)
rxrpc_do_process_connection(conn);
rxrpc_unuse_local(conn->local, rxrpc_local_unuse_conn_work);
}
rxrpc_put_connection(conn, rxrpc_conn_put_work);
_leave("");
return;
}
......@@ -19,7 +19,9 @@
unsigned int __read_mostly rxrpc_connection_expiry = 10 * 60;
unsigned int __read_mostly rxrpc_closed_conn_expiry = 10;
static void rxrpc_destroy_connection(struct rcu_head *);
static void rxrpc_clean_up_connection(struct work_struct *work);
static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
unsigned long reap_at);
static void rxrpc_connection_timer(struct timer_list *timer)
{
......@@ -32,7 +34,8 @@ static void rxrpc_connection_timer(struct timer_list *timer)
/*
* allocate a new connection
*/
struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *rxnet,
gfp_t gfp)
{
struct rxrpc_connection *conn;
......@@ -42,10 +45,12 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
if (conn) {
INIT_LIST_HEAD(&conn->cache_link);
timer_setup(&conn->timer, &rxrpc_connection_timer, 0);
INIT_WORK(&conn->processor, &rxrpc_process_connection);
INIT_WORK(&conn->processor, rxrpc_process_connection);
INIT_WORK(&conn->destructor, rxrpc_clean_up_connection);
INIT_LIST_HEAD(&conn->proc_link);
INIT_LIST_HEAD(&conn->link);
skb_queue_head_init(&conn->rx_queue);
conn->rxnet = rxnet;
conn->security = &rxrpc_no_security;
spin_lock_init(&conn->state_lock);
conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
......@@ -224,53 +229,20 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
conn->idle_timestamp = jiffies;
}
/*
* Kill off a connection.
*/
void rxrpc_kill_connection(struct rxrpc_connection *conn)
{
struct rxrpc_net *rxnet = conn->local->rxnet;
ASSERT(!rcu_access_pointer(conn->channels[0].call) &&
!rcu_access_pointer(conn->channels[1].call) &&
!rcu_access_pointer(conn->channels[2].call) &&
!rcu_access_pointer(conn->channels[3].call));
ASSERT(list_empty(&conn->cache_link));
write_lock(&rxnet->conn_lock);
list_del_init(&conn->proc_link);
write_unlock(&rxnet->conn_lock);
/* Drain the Rx queue. Note that even though we've unpublished, an
* incoming packet could still be being added to our Rx queue, so we
* will need to drain it again in the RCU cleanup handler.
*/
rxrpc_purge_queue(&conn->rx_queue);
/* Leave final destruction to RCU. The connection processor work item
* must carry a ref on the connection to prevent us getting here whilst
* it is queued or running.
*/
call_rcu(&conn->rcu, rxrpc_destroy_connection);
if (atomic_dec_and_test(&conn->active))
rxrpc_set_service_reap_timer(conn->rxnet,
jiffies + rxrpc_connection_expiry);
}
/*
* Queue a connection's work processor, getting a ref to pass to the work
* queue.
*/
bool rxrpc_queue_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
void rxrpc_queue_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
{
int r;
if (!__refcount_inc_not_zero(&conn->ref, &r))
return false;
if (rxrpc_queue_work(&conn->processor))
trace_rxrpc_conn(conn->debug_id, why, r + 1);
else
rxrpc_put_connection(conn, rxrpc_conn_put_already_queued);
return true;
if (atomic_read(&conn->active) >= 0 &&
rxrpc_queue_work(&conn->processor))
rxrpc_see_connection(conn, why);
}
/*
......@@ -327,51 +299,96 @@ static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
timer_reduce(&rxnet->service_conn_reap_timer, reap_at);
}
/*
* Release a service connection
*/
void rxrpc_put_service_conn(struct rxrpc_connection *conn,
enum rxrpc_conn_trace why)
{
unsigned int debug_id = conn->debug_id;
int r;
__refcount_dec(&conn->ref, &r);
trace_rxrpc_conn(debug_id, r - 1, why);
if (r - 1 == 1)
rxrpc_set_service_reap_timer(conn->local->rxnet,
jiffies + rxrpc_connection_expiry);
}
/*
* destroy a virtual connection
*/
static void rxrpc_destroy_connection(struct rcu_head *rcu)
static void rxrpc_rcu_free_connection(struct rcu_head *rcu)
{
struct rxrpc_connection *conn =
container_of(rcu, struct rxrpc_connection, rcu);
struct rxrpc_net *rxnet = conn->rxnet;
_enter("{%d,u=%d}", conn->debug_id, refcount_read(&conn->ref));
trace_rxrpc_conn(conn->debug_id, refcount_read(&conn->ref),
rxrpc_conn_free);
kfree(conn);
if (atomic_dec_and_test(&rxnet->nr_conns))
wake_up_var(&rxnet->nr_conns);
}
/*
* Clean up a dead connection.
*/
static void rxrpc_clean_up_connection(struct work_struct *work)
{
struct rxrpc_connection *conn =
container_of(work, struct rxrpc_connection, destructor);
struct rxrpc_net *rxnet = conn->rxnet;
ASSERTCMP(refcount_read(&conn->ref), ==, 0);
ASSERT(!rcu_access_pointer(conn->channels[0].call) &&
!rcu_access_pointer(conn->channels[1].call) &&
!rcu_access_pointer(conn->channels[2].call) &&
!rcu_access_pointer(conn->channels[3].call));
ASSERT(list_empty(&conn->cache_link));
del_timer_sync(&conn->timer);
cancel_work_sync(&conn->processor); /* Processing may restart the timer */
del_timer_sync(&conn->timer);
write_lock(&rxnet->conn_lock);
list_del_init(&conn->proc_link);
write_unlock(&rxnet->conn_lock);
rxrpc_purge_queue(&conn->rx_queue);
rxrpc_kill_client_conn(conn);
conn->security->clear(conn);
key_put(conn->key);
rxrpc_put_bundle(conn->bundle, rxrpc_bundle_put_conn);
rxrpc_put_peer(conn->peer, rxrpc_peer_put_conn);
if (atomic_dec_and_test(&conn->local->rxnet->nr_conns))
wake_up_var(&conn->local->rxnet->nr_conns);
rxrpc_put_local(conn->local, rxrpc_local_put_kill_conn);
kfree(conn);
_leave("");
/* Drain the Rx queue. Note that even though we've unpublished, an
* incoming packet could still be being added to our Rx queue, so we
* will need to drain it again in the RCU cleanup handler.
*/
rxrpc_purge_queue(&conn->rx_queue);
call_rcu(&conn->rcu, rxrpc_rcu_free_connection);
}
/*
* Drop a ref on a connection.
*/
void rxrpc_put_connection(struct rxrpc_connection *conn,
enum rxrpc_conn_trace why)
{
unsigned int debug_id;
bool dead;
int r;
if (!conn)
return;
debug_id = conn->debug_id;
dead = __refcount_dec_and_test(&conn->ref, &r);
trace_rxrpc_conn(debug_id, r - 1, why);
if (dead) {
del_timer(&conn->timer);
cancel_work(&conn->processor);
if (in_softirq() || work_busy(&conn->processor) ||
timer_pending(&conn->timer))
/* Can't use the rxrpc workqueue as we need to cancel/flush
* something that may be running/waiting there.
*/
schedule_work(&conn->destructor);
else
rxrpc_clean_up_connection(&conn->destructor);
}
}
/*
......@@ -383,6 +400,7 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
struct rxrpc_net *rxnet =
container_of(work, struct rxrpc_net, service_conn_reaper);
unsigned long expire_at, earliest, idle_timestamp, now;
int active;
LIST_HEAD(graveyard);
......@@ -393,8 +411,8 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
write_lock(&rxnet->conn_lock);
list_for_each_entry_safe(conn, _p, &rxnet->service_conns, link) {
ASSERTCMP(refcount_read(&conn->ref), >, 0);
if (likely(refcount_read(&conn->ref) > 1))
ASSERTCMP(atomic_read(&conn->active), >=, 0);
if (likely(atomic_read(&conn->active) > 0))
continue;
if (conn->state == RXRPC_CONN_SERVICE_PREALLOC)
continue;
......@@ -405,8 +423,8 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
if (conn->local->service_closed)
expire_at = idle_timestamp + rxrpc_closed_conn_expiry * HZ;
_debug("reap CONN %d { u=%d,t=%ld }",
conn->debug_id, refcount_read(&conn->ref),
_debug("reap CONN %d { a=%d,t=%ld }",
conn->debug_id, atomic_read(&conn->active),
(long)expire_at - (long)now);
if (time_before(now, expire_at)) {
......@@ -416,10 +434,11 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
}
}
/* The usage count sits at 1 whilst the object is unused on the
* list; we reduce that to 0 to make the object unavailable.
/* The activity count sits at 0 whilst the conn is unused on
* the list; we reduce that to -1 to make the conn unavailable.
*/
if (!refcount_dec_if_one(&conn->ref))
active = 0;
if (!atomic_try_cmpxchg(&conn->active, &active, -1))
continue;
rxrpc_see_connection(conn, rxrpc_conn_see_reap_service);
......@@ -443,8 +462,8 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
link);
list_del_init(&conn->link);
ASSERTCMP(refcount_read(&conn->ref), ==, 0);
rxrpc_kill_connection(conn);
ASSERTCMP(atomic_read(&conn->active), ==, -1);
rxrpc_put_connection(conn, rxrpc_conn_put_service_reaped);
}
_leave("");
......
......@@ -125,7 +125,7 @@ static void rxrpc_publish_service_conn(struct rxrpc_peer *peer,
struct rxrpc_connection *rxrpc_prealloc_service_connection(struct rxrpc_net *rxnet,
gfp_t gfp)
{
struct rxrpc_connection *conn = rxrpc_alloc_connection(gfp);
struct rxrpc_connection *conn = rxrpc_alloc_connection(rxnet, gfp);
if (conn) {
/* We maintain an extra ref on the connection whilst it is on
......@@ -181,6 +181,8 @@ void rxrpc_new_incoming_connection(struct rxrpc_sock *rx,
conn->service_id == rx->service_upgrade.from)
conn->service_id = rx->service_upgrade.to;
atomic_set(&conn->active, 1);
/* Make the connection a target for incoming packets. */
rxrpc_publish_service_conn(conn->peer, conn);
}
......
......@@ -65,7 +65,7 @@ static __net_init int rxrpc_init_net(struct net *net)
atomic_set(&rxnet->nr_client_conns, 0);
rxnet->kill_all_client_conns = false;
spin_lock_init(&rxnet->client_conn_cache_lock);
spin_lock_init(&rxnet->client_conn_discard_lock);
mutex_init(&rxnet->client_conn_discard_lock);
INIT_LIST_HEAD(&rxnet->idle_client_conns);
INIT_WORK(&rxnet->client_conn_reaper,
rxrpc_discard_expired_client_conns);
......
......@@ -159,7 +159,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
seq_puts(seq,
"Proto Local "
" Remote "
" SvID ConnID End Use State Key "
" SvID ConnID End Ref Act State Key "
" Serial ISerial CallId0 CallId1 CallId2 CallId3\n"
);
return 0;
......@@ -177,7 +177,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
sprintf(rbuff, "%pISpc", &conn->peer->srx.transport);
print:
seq_printf(seq,
"UDP %-47.47s %-47.47s %4x %08x %s %3u"
"UDP %-47.47s %-47.47s %4x %08x %s %3u %3d"
" %s %08x %08x %08x %08x %08x %08x %08x\n",
lbuff,
rbuff,
......@@ -185,6 +185,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
conn->proto.cid,
rxrpc_conn_is_service(conn) ? "Svc" : "Clt",
refcount_read(&conn->ref),
atomic_read(&conn->active),
rxrpc_conn_states[conn->state],
key_serial(conn->key),
atomic_read(&conn->serial),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment