Commit 2d689424 authored by David Howells's avatar David Howells

rxrpc: Move call state changes from sendmsg to I/O thread

Move all the call state changes that are made in rxrpc_sendmsg() to the I/O
thread.  This is a step towards removing the call state lock.

This requires the switch to the RXRPC_CALL_CLIENT_AWAIT_REPLY and
RXRPC_CALL_SERVER_SEND_REPLY states to be done when the last packet is
decanted from ->tx_sendmsg to ->tx_buffer in the I/O thread, not when it is
added to ->tx_sendmsg by sendmsg().
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent d41b3f5b
...@@ -880,8 +880,8 @@ The kernel interface functions are as follows: ...@@ -880,8 +880,8 @@ The kernel interface functions are as follows:
notify_end_rx can be NULL or it can be used to specify a function to be notify_end_rx can be NULL or it can be used to specify a function to be
called when the call changes state to end the Tx phase. This function is called when the call changes state to end the Tx phase. This function is
called with the call-state spinlock held to prevent any reply or final ACK called with a spinlock held to prevent the last DATA packet from being
from being delivered first. transmitted until the function returns.
(#) Receive data from a call:: (#) Receive data from a call::
......
...@@ -251,6 +251,50 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb) ...@@ -251,6 +251,50 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
_leave(""); _leave("");
} }
/*
* Start transmitting the reply to a service. This cancels the need to ACK the
* request if we haven't yet done so.
*/
static void rxrpc_begin_service_reply(struct rxrpc_call *call)
{
unsigned long now;
write_lock(&call->state_lock);
if (call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
now = jiffies;
call->state = RXRPC_CALL_SERVER_SEND_REPLY;
WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
if (call->ackr_reason == RXRPC_ACK_DELAY)
call->ackr_reason = 0;
trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
}
write_unlock(&call->state_lock);
}
/*
* Close the transmission phase. After this point there is no more data to be
* transmitted in the call.
*/
static void rxrpc_close_tx_phase(struct rxrpc_call *call)
{
_debug("________awaiting reply/ACK__________");
write_lock(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_CLIENT_SEND_REQUEST:
call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
break;
case RXRPC_CALL_SERVER_SEND_REPLY:
call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
break;
default:
break;
}
write_unlock(&call->state_lock);
}
static bool rxrpc_tx_window_has_space(struct rxrpc_call *call) static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
{ {
unsigned int winsize = min_t(unsigned int, call->tx_winsize, unsigned int winsize = min_t(unsigned int, call->tx_winsize,
...@@ -285,6 +329,9 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call) ...@@ -285,6 +329,9 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
call->tx_top = txb->seq; call->tx_top = txb->seq;
list_add_tail(&txb->call_link, &call->tx_buffer); list_add_tail(&txb->call_link, &call->tx_buffer);
if (txb->wire.flags & RXRPC_LAST_PACKET)
rxrpc_close_tx_phase(call);
rxrpc_transmit_one(call, txb); rxrpc_transmit_one(call, txb);
if (!rxrpc_tx_window_has_space(call)) if (!rxrpc_tx_window_has_space(call))
...@@ -298,12 +345,11 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call) ...@@ -298,12 +345,11 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
case RXRPC_CALL_SERVER_ACK_REQUEST: case RXRPC_CALL_SERVER_ACK_REQUEST:
if (list_empty(&call->tx_sendmsg)) if (list_empty(&call->tx_sendmsg))
return; return;
rxrpc_begin_service_reply(call);
fallthrough; fallthrough;
case RXRPC_CALL_SERVER_SEND_REPLY: case RXRPC_CALL_SERVER_SEND_REPLY:
case RXRPC_CALL_SERVER_AWAIT_ACK:
case RXRPC_CALL_CLIENT_SEND_REQUEST: case RXRPC_CALL_CLIENT_SEND_REQUEST:
case RXRPC_CALL_CLIENT_AWAIT_REPLY:
if (!rxrpc_tx_window_has_space(call)) if (!rxrpc_tx_window_has_space(call))
return; return;
if (list_empty(&call->tx_sendmsg)) { if (list_empty(&call->tx_sendmsg)) {
......
...@@ -189,7 +189,6 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, ...@@ -189,7 +189,6 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
struct rxrpc_txbuf *txb, struct rxrpc_txbuf *txb,
rxrpc_notify_end_tx_t notify_end_tx) rxrpc_notify_end_tx_t notify_end_tx)
{ {
unsigned long now;
rxrpc_seq_t seq = txb->seq; rxrpc_seq_t seq = txb->seq;
bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke; bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
...@@ -212,36 +211,10 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call, ...@@ -212,36 +211,10 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
poke = list_empty(&call->tx_sendmsg); poke = list_empty(&call->tx_sendmsg);
list_add_tail(&txb->call_link, &call->tx_sendmsg); list_add_tail(&txb->call_link, &call->tx_sendmsg);
call->tx_prepared = seq; call->tx_prepared = seq;
if (last)
rxrpc_notify_end_tx(rx, call, notify_end_tx);
spin_unlock(&call->tx_lock); spin_unlock(&call->tx_lock);
if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
_debug("________awaiting reply/ACK__________");
write_lock(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_CLIENT_SEND_REQUEST:
call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
rxrpc_notify_end_tx(rx, call, notify_end_tx);
break;
case RXRPC_CALL_SERVER_ACK_REQUEST:
call->state = RXRPC_CALL_SERVER_SEND_REPLY;
now = jiffies;
WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
if (call->ackr_reason == RXRPC_ACK_DELAY)
call->ackr_reason = 0;
trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
if (!last)
break;
fallthrough;
case RXRPC_CALL_SERVER_SEND_REPLY:
call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
rxrpc_notify_end_tx(rx, call, notify_end_tx);
break;
default:
break;
}
write_unlock(&call->state_lock);
}
if (poke) if (poke)
rxrpc_poke_call(call, rxrpc_call_poke_start); rxrpc_poke_call(call, rxrpc_call_poke_start);
} }
...@@ -280,8 +253,13 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, ...@@ -280,8 +253,13 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
ret = -EPROTO; ret = -EPROTO;
if (state != RXRPC_CALL_CLIENT_SEND_REQUEST && if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
state != RXRPC_CALL_SERVER_ACK_REQUEST && state != RXRPC_CALL_SERVER_ACK_REQUEST &&
state != RXRPC_CALL_SERVER_SEND_REPLY) state != RXRPC_CALL_SERVER_SEND_REPLY) {
/* Request phase complete for this client call */
trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
call->cid, call->call_id, call->rx_consumed,
0, -EPROTO);
goto maybe_error; goto maybe_error;
}
ret = -EMSGSIZE; ret = -EMSGSIZE;
if (call->tx_total_len != -1) { if (call->tx_total_len != -1) {
...@@ -573,7 +551,6 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, ...@@ -573,7 +551,6 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
__releases(&rx->sk.sk_lock.slock) __releases(&rx->sk.sk_lock.slock)
{ {
enum rxrpc_call_state state;
struct rxrpc_call *call; struct rxrpc_call *call;
unsigned long now, j; unsigned long now, j;
bool dropped_lock = false; bool dropped_lock = false;
...@@ -672,11 +649,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) ...@@ -672,11 +649,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
break; break;
} }
state = rxrpc_call_state(call); if (rxrpc_call_is_complete(call)) {
_debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, state, call->conn);
if (state >= RXRPC_CALL_COMPLETE) {
/* it's too late for this call */ /* it's too late for this call */
ret = -ESHUTDOWN; ret = -ESHUTDOWN;
} else if (p.command == RXRPC_CMD_SEND_ABORT) { } else if (p.command == RXRPC_CMD_SEND_ABORT) {
...@@ -722,7 +695,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call, ...@@ -722,7 +695,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
bool dropped_lock = false; bool dropped_lock = false;
int ret; int ret;
_enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]); _enter("{%d},", call->debug_id);
ASSERTCMP(msg->msg_name, ==, NULL); ASSERTCMP(msg->msg_name, ==, NULL);
ASSERTCMP(msg->msg_control, ==, NULL); ASSERTCMP(msg->msg_control, ==, NULL);
...@@ -732,26 +705,10 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call, ...@@ -732,26 +705,10 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
_debug("CALL %d USR %lx ST %d on CONN %p", _debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn); call->debug_id, call->user_call_ID, call->state, call->conn);
switch (rxrpc_call_state(call)) { ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
case RXRPC_CALL_CLIENT_SEND_REQUEST: notify_end_tx, &dropped_lock);
case RXRPC_CALL_SERVER_ACK_REQUEST: if (ret == -ESHUTDOWN)
case RXRPC_CALL_SERVER_SEND_REPLY:
ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
notify_end_tx, &dropped_lock);
break;
case RXRPC_CALL_COMPLETE:
read_lock(&call->state_lock);
ret = call->error; ret = call->error;
read_unlock(&call->state_lock);
break;
default:
/* Request phase complete for this client call */
trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
call->cid, call->call_id, call->rx_consumed,
0, -EPROTO);
ret = -EPROTO;
break;
}
if (!dropped_lock) if (!dropped_lock)
mutex_unlock(&call->user_mutex); mutex_unlock(&call->user_mutex);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment