Commit e34d4234 authored by David Howells's avatar David Howells

rxrpc: Trace rxrpc_call usage

Add a trace event for debuging rxrpc_call struct usage.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
parent f5c17aae
...@@ -16,6 +16,45 @@ ...@@ -16,6 +16,45 @@
#include <linux/tracepoint.h> #include <linux/tracepoint.h>
TRACE_EVENT(rxrpc_call,
TP_PROTO(struct rxrpc_call *call, int op, int usage, int nskb,
const void *where, const void *aux),
TP_ARGS(call, op, usage, nskb, where, aux),
TP_STRUCT__entry(
__field(struct rxrpc_call *, call )
__field(int, op )
__field(int, usage )
__field(int, nskb )
__field(const void *, where )
__field(const void *, aux )
),
TP_fast_assign(
__entry->call = call;
__entry->op = op;
__entry->usage = usage;
__entry->nskb = nskb;
__entry->where = where;
__entry->aux = aux;
),
TP_printk("c=%p %s u=%d s=%d p=%pSR a=%p",
__entry->call,
(__entry->op == 0 ? "NWc" :
__entry->op == 1 ? "NWs" :
__entry->op == 2 ? "SEE" :
__entry->op == 3 ? "GET" :
__entry->op == 4 ? "Gsb" :
__entry->op == 5 ? "PUT" :
"Psb"),
__entry->usage,
__entry->nskb,
__entry->where,
__entry->aux)
);
TRACE_EVENT(rxrpc_skb, TRACE_EVENT(rxrpc_skb,
TP_PROTO(struct sk_buff *skb, int op, int usage, int mod_count, TP_PROTO(struct sk_buff *skb, int op, int usage, int mod_count,
const void *where), const void *where),
......
...@@ -543,7 +543,11 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *, ...@@ -543,7 +543,11 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *,
struct sk_buff *); struct sk_buff *);
void rxrpc_release_call(struct rxrpc_call *); void rxrpc_release_call(struct rxrpc_call *);
void rxrpc_release_calls_on_socket(struct rxrpc_sock *); void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
void __rxrpc_put_call(struct rxrpc_call *); void rxrpc_see_call(struct rxrpc_call *);
void rxrpc_get_call(struct rxrpc_call *);
void rxrpc_put_call(struct rxrpc_call *);
void rxrpc_get_call_for_skb(struct rxrpc_call *, struct sk_buff *);
void rxrpc_put_call_for_skb(struct rxrpc_call *, struct sk_buff *);
void __exit rxrpc_destroy_all_calls(void); void __exit rxrpc_destroy_all_calls(void);
static inline bool rxrpc_is_service_call(const struct rxrpc_call *call) static inline bool rxrpc_is_service_call(const struct rxrpc_call *call)
...@@ -1022,16 +1026,3 @@ do { \ ...@@ -1022,16 +1026,3 @@ do { \
} while (0) } while (0)
#endif /* __KDEBUGALL */ #endif /* __KDEBUGALL */
#define rxrpc_get_call(CALL) \
do { \
CHECK_SLAB_OKAY(&(CALL)->usage); \
if (atomic_inc_return(&(CALL)->usage) == 1) \
BUG(); \
} while (0)
#define rxrpc_put_call(CALL) \
do { \
__rxrpc_put_call(CALL); \
} while (0)
...@@ -129,8 +129,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local, ...@@ -129,8 +129,7 @@ static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
_debug("conn ready"); _debug("conn ready");
call->state = RXRPC_CALL_SERVER_ACCEPTING; call->state = RXRPC_CALL_SERVER_ACCEPTING;
list_add_tail(&call->accept_link, &rx->acceptq); list_add_tail(&call->accept_link, &rx->acceptq);
rxrpc_get_call(call); rxrpc_get_call_for_skb(call, notification);
atomic_inc(&call->skb_count);
nsp = rxrpc_skb(notification); nsp = rxrpc_skb(notification);
nsp->call = call; nsp->call = call;
...@@ -323,6 +322,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx, ...@@ -323,6 +322,7 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link); call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
list_del_init(&call->accept_link); list_del_init(&call->accept_link);
sk_acceptq_removed(&rx->sk); sk_acceptq_removed(&rx->sk);
rxrpc_see_call(call);
write_lock_bh(&call->state_lock); write_lock_bh(&call->state_lock);
switch (call->state) { switch (call->state) {
...@@ -395,6 +395,7 @@ int rxrpc_reject_call(struct rxrpc_sock *rx) ...@@ -395,6 +395,7 @@ int rxrpc_reject_call(struct rxrpc_sock *rx)
call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link); call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
list_del_init(&call->accept_link); list_del_init(&call->accept_link);
sk_acceptq_removed(&rx->sk); sk_acceptq_removed(&rx->sk);
rxrpc_see_call(call);
write_lock_bh(&call->state_lock); write_lock_bh(&call->state_lock);
switch (call->state) { switch (call->state) {
......
...@@ -465,8 +465,7 @@ static void rxrpc_insert_oos_packet(struct rxrpc_call *call, ...@@ -465,8 +465,7 @@ static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
skb->destructor = rxrpc_packet_destructor; skb->destructor = rxrpc_packet_destructor;
ASSERTCMP(sp->call, ==, NULL); ASSERTCMP(sp->call, ==, NULL);
sp->call = call; sp->call = call;
rxrpc_get_call(call); rxrpc_get_call_for_skb(call, skb);
atomic_inc(&call->skb_count);
/* insert into the buffer in sequence order */ /* insert into the buffer in sequence order */
spin_lock_bh(&call->lock); spin_lock_bh(&call->lock);
...@@ -741,8 +740,7 @@ static int rxrpc_process_rx_queue(struct rxrpc_call *call, ...@@ -741,8 +740,7 @@ static int rxrpc_process_rx_queue(struct rxrpc_call *call,
_debug("post ACK"); _debug("post ACK");
skb->mark = RXRPC_SKB_MARK_FINAL_ACK; skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
sp->call = call; sp->call = call;
rxrpc_get_call(call); rxrpc_get_call_for_skb(call, skb);
atomic_inc(&call->skb_count);
spin_lock_bh(&call->lock); spin_lock_bh(&call->lock);
if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0) if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
BUG(); BUG();
...@@ -801,8 +799,7 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error, ...@@ -801,8 +799,7 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
memset(sp, 0, sizeof(*sp)); memset(sp, 0, sizeof(*sp));
sp->error = error; sp->error = error;
sp->call = call; sp->call = call;
rxrpc_get_call(call); rxrpc_get_call_for_skb(call, skb);
atomic_inc(&call->skb_count);
spin_lock_bh(&call->lock); spin_lock_bh(&call->lock);
ret = rxrpc_queue_rcv_skb(call, skb, true, fatal); ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
...@@ -834,6 +831,8 @@ void rxrpc_process_call(struct work_struct *work) ...@@ -834,6 +831,8 @@ void rxrpc_process_call(struct work_struct *work)
u32 serial, abort_code = RX_PROTOCOL_ERROR; u32 serial, abort_code = RX_PROTOCOL_ERROR;
u8 *acks = NULL; u8 *acks = NULL;
rxrpc_see_call(call);
//printk("\n--------------------\n"); //printk("\n--------------------\n");
_enter("{%d,%s,%lx} [%lu]", _enter("{%d,%s,%lx} [%lu]",
call->debug_id, rxrpc_call_states[call->state], call->events, call->debug_id, rxrpc_call_states[call->state], call->events,
......
...@@ -219,6 +219,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx, ...@@ -219,6 +219,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
{ {
struct rxrpc_call *call, *xcall; struct rxrpc_call *call, *xcall;
struct rb_node *parent, **pp; struct rb_node *parent, **pp;
const void *here = __builtin_return_address(0);
int ret; int ret;
_enter("%p,%lx", rx, user_call_ID); _enter("%p,%lx", rx, user_call_ID);
...@@ -229,6 +230,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx, ...@@ -229,6 +230,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
return call; return call;
} }
trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here,
(const void *)user_call_ID);
/* Publish the call, even though it is incompletely set up as yet */ /* Publish the call, even though it is incompletely set up as yet */
call->user_call_ID = user_call_ID; call->user_call_ID = user_call_ID;
__set_bit(RXRPC_CALL_HAS_USERID, &call->flags); __set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
...@@ -308,6 +312,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, ...@@ -308,6 +312,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
{ {
struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rxrpc_call *call, *candidate; struct rxrpc_call *call, *candidate;
const void *here = __builtin_return_address(0);
u32 call_id, chan; u32 call_id, chan;
_enter(",%d", conn->debug_id); _enter(",%d", conn->debug_id);
...@@ -318,6 +323,9 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, ...@@ -318,6 +323,9 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
if (!candidate) if (!candidate)
return ERR_PTR(-EBUSY); return ERR_PTR(-EBUSY);
trace_rxrpc_call(candidate, 1, atomic_read(&candidate->usage),
0, here, NULL);
chan = sp->hdr.cid & RXRPC_CHANNELMASK; chan = sp->hdr.cid & RXRPC_CHANNELMASK;
candidate->socket = rx; candidate->socket = rx;
candidate->conn = conn; candidate->conn = conn;
...@@ -430,6 +438,44 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, ...@@ -430,6 +438,44 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
return ERR_PTR(-ECONNRESET); return ERR_PTR(-ECONNRESET);
} }
/*
* Note the re-emergence of a call.
*/
void rxrpc_see_call(struct rxrpc_call *call)
{
const void *here = __builtin_return_address(0);
if (call) {
int n = atomic_read(&call->usage);
int m = atomic_read(&call->skb_count);
trace_rxrpc_call(call, 2, n, m, here, 0);
}
}
/*
* Note the addition of a ref on a call.
*/
void rxrpc_get_call(struct rxrpc_call *call)
{
const void *here = __builtin_return_address(0);
int n = atomic_inc_return(&call->usage);
int m = atomic_read(&call->skb_count);
trace_rxrpc_call(call, 3, n, m, here, 0);
}
/*
* Note the addition of a ref on a call for a socket buffer.
*/
void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
{
const void *here = __builtin_return_address(0);
int n = atomic_inc_return(&call->usage);
int m = atomic_inc_return(&call->skb_count);
trace_rxrpc_call(call, 4, n, m, here, skb);
}
/* /*
* detach a call from a socket and set up for release * detach a call from a socket and set up for release
*/ */
...@@ -443,6 +489,8 @@ void rxrpc_release_call(struct rxrpc_call *call) ...@@ -443,6 +489,8 @@ void rxrpc_release_call(struct rxrpc_call *call)
atomic_read(&call->ackr_not_idle), atomic_read(&call->ackr_not_idle),
call->rx_first_oos); call->rx_first_oos);
rxrpc_see_call(call);
spin_lock_bh(&call->lock); spin_lock_bh(&call->lock);
if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags)) if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
BUG(); BUG();
...@@ -526,6 +574,7 @@ static void rxrpc_dead_call_expired(unsigned long _call) ...@@ -526,6 +574,7 @@ static void rxrpc_dead_call_expired(unsigned long _call)
_enter("{%d}", call->debug_id); _enter("{%d}", call->debug_id);
rxrpc_see_call(call);
write_lock_bh(&call->state_lock); write_lock_bh(&call->state_lock);
call->state = RXRPC_CALL_DEAD; call->state = RXRPC_CALL_DEAD;
write_unlock_bh(&call->state_lock); write_unlock_bh(&call->state_lock);
...@@ -540,6 +589,7 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call) ...@@ -540,6 +589,7 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call)
{ {
bool sched; bool sched;
rxrpc_see_call(call);
write_lock(&call->state_lock); write_lock(&call->state_lock);
if (call->state < RXRPC_CALL_DEAD) { if (call->state < RXRPC_CALL_DEAD) {
sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET); sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
...@@ -585,21 +635,43 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx) ...@@ -585,21 +635,43 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
/* /*
* release a call * release a call
*/ */
void __rxrpc_put_call(struct rxrpc_call *call) void rxrpc_put_call(struct rxrpc_call *call)
{ {
ASSERT(call != NULL); const void *here = __builtin_return_address(0);
int n, m;
_enter("%p{u=%d}", call, atomic_read(&call->usage)); ASSERT(call != NULL);
ASSERTCMP(atomic_read(&call->usage), >, 0); n = atomic_dec_return(&call->usage);
m = atomic_read(&call->skb_count);
trace_rxrpc_call(call, 5, n, m, here, NULL);
ASSERTCMP(n, >=, 0);
if (n == 0) {
_debug("call %d dead", call->debug_id);
WARN_ON(m != 0);
ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
rxrpc_queue_work(&call->destroyer);
}
}
if (atomic_dec_and_test(&call->usage)) { /*
* Release a call ref held by a socket buffer.
*/
void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
{
const void *here = __builtin_return_address(0);
int n, m;
n = atomic_dec_return(&call->usage);
m = atomic_dec_return(&call->skb_count);
trace_rxrpc_call(call, 6, n, m, here, skb);
ASSERTCMP(n, >=, 0);
if (n == 0) {
_debug("call %d dead", call->debug_id); _debug("call %d dead", call->debug_id);
WARN_ON(atomic_read(&call->skb_count) != 0); WARN_ON(m != 0);
ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
rxrpc_queue_work(&call->destroyer); rxrpc_queue_work(&call->destroyer);
} }
_leave("");
} }
/* /*
...@@ -705,6 +777,7 @@ void __exit rxrpc_destroy_all_calls(void) ...@@ -705,6 +777,7 @@ void __exit rxrpc_destroy_all_calls(void)
call = list_entry(rxrpc_calls.next, struct rxrpc_call, link); call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
_debug("Zapping call %p", call); _debug("Zapping call %p", call);
rxrpc_see_call(call);
list_del_init(&call->link); list_del_init(&call->link);
switch (atomic_read(&call->usage)) { switch (atomic_read(&call->usage)) {
...@@ -748,6 +821,7 @@ static void rxrpc_call_life_expired(unsigned long _call) ...@@ -748,6 +821,7 @@ static void rxrpc_call_life_expired(unsigned long _call)
_enter("{%d}", call->debug_id); _enter("{%d}", call->debug_id);
rxrpc_see_call(call);
if (call->state >= RXRPC_CALL_COMPLETE) if (call->state >= RXRPC_CALL_COMPLETE)
return; return;
...@@ -765,6 +839,7 @@ static void rxrpc_resend_time_expired(unsigned long _call) ...@@ -765,6 +839,7 @@ static void rxrpc_resend_time_expired(unsigned long _call)
_enter("{%d}", call->debug_id); _enter("{%d}", call->debug_id);
rxrpc_see_call(call);
if (call->state >= RXRPC_CALL_COMPLETE) if (call->state >= RXRPC_CALL_COMPLETE)
return; return;
...@@ -782,6 +857,7 @@ static void rxrpc_ack_time_expired(unsigned long _call) ...@@ -782,6 +857,7 @@ static void rxrpc_ack_time_expired(unsigned long _call)
_enter("{%d}", call->debug_id); _enter("{%d}", call->debug_id);
rxrpc_see_call(call);
if (call->state >= RXRPC_CALL_COMPLETE) if (call->state >= RXRPC_CALL_COMPLETE)
return; return;
......
...@@ -537,6 +537,7 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn, ...@@ -537,6 +537,7 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
struct rxrpc_call, chan_wait_link); struct rxrpc_call, chan_wait_link);
u32 call_id = chan->call_counter + 1; u32 call_id = chan->call_counter + 1;
rxrpc_see_call(call);
list_del_init(&call->chan_wait_link); list_del_init(&call->chan_wait_link);
conn->active_chans |= 1 << channel; conn->active_chans |= 1 << channel;
call->peer = rxrpc_get_peer(conn->params.peer); call->peer = rxrpc_get_peer(conn->params.peer);
......
...@@ -157,6 +157,7 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, ...@@ -157,6 +157,7 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn,
conn->channels[i].call, conn->channels[i].call,
lockdep_is_held(&conn->channel_lock)); lockdep_is_held(&conn->channel_lock));
if (call) { if (call) {
rxrpc_see_call(call);
write_lock_bh(&call->state_lock); write_lock_bh(&call->state_lock);
if (rxrpc_set_call_completion(call, compl, abort_code, if (rxrpc_set_call_completion(call, compl, abort_code,
error)) { error)) {
......
...@@ -196,8 +196,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call, ...@@ -196,8 +196,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
goto enqueue_packet; goto enqueue_packet;
sp->call = call; sp->call = call;
rxrpc_get_call(call); rxrpc_get_call_for_skb(call, skb);
atomic_inc(&call->skb_count);
terminal = ((flags & RXRPC_LAST_PACKET) && terminal = ((flags & RXRPC_LAST_PACKET) &&
!(flags & RXRPC_CLIENT_INITIATED)); !(flags & RXRPC_CLIENT_INITIATED));
ret = rxrpc_queue_rcv_skb(call, skb, false, terminal); ret = rxrpc_queue_rcv_skb(call, skb, false, terminal);
...@@ -748,6 +747,7 @@ void rxrpc_data_ready(struct sock *sk) ...@@ -748,6 +747,7 @@ void rxrpc_data_ready(struct sock *sk)
if (!call || atomic_read(&call->usage) == 0) if (!call || atomic_read(&call->usage) == 0)
goto cant_route_call; goto cant_route_call;
rxrpc_see_call(call);
rxrpc_post_packet_to_call(call, skb); rxrpc_post_packet_to_call(call, skb);
goto out_unlock; goto out_unlock;
} }
......
...@@ -207,6 +207,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) ...@@ -207,6 +207,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
return PTR_ERR(call); return PTR_ERR(call);
} }
rxrpc_see_call(call);
_debug("CALL %d USR %lx ST %d on CONN %p", _debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn); call->debug_id, call->user_call_ID, call->state, call->conn);
......
...@@ -270,6 +270,7 @@ void rxrpc_peer_error_distributor(struct work_struct *work) ...@@ -270,6 +270,7 @@ void rxrpc_peer_error_distributor(struct work_struct *work)
call = hlist_entry(peer->error_targets.first, call = hlist_entry(peer->error_targets.first,
struct rxrpc_call, error_link); struct rxrpc_call, error_link);
hlist_del_init(&call->error_link); hlist_del_init(&call->error_link);
rxrpc_see_call(call);
queue = false; queue = false;
write_lock(&call->state_lock); write_lock(&call->state_lock);
......
...@@ -115,6 +115,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, ...@@ -115,6 +115,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
sp = rxrpc_skb(skb); sp = rxrpc_skb(skb);
call = sp->call; call = sp->call;
ASSERT(call != NULL); ASSERT(call != NULL);
rxrpc_see_call(call);
_debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
......
...@@ -140,9 +140,7 @@ void rxrpc_packet_destructor(struct sk_buff *skb) ...@@ -140,9 +140,7 @@ void rxrpc_packet_destructor(struct sk_buff *skb)
_enter("%p{%p}", skb, call); _enter("%p{%p}", skb, call);
if (call) { if (call) {
if (atomic_dec_return(&call->skb_count) < 0) rxrpc_put_call_for_skb(call, skb);
BUG();
rxrpc_put_call(call);
sp->call = NULL; sp->call = NULL;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment