Commit 9740a7ae authored by David S. Miller's avatar David S. Miller

Merge branch 'mptcp-do-not-block-on-subflow-socket'

Florian Westphal says:

====================
mptcp: do not block on subflow socket

This series reworks mptcp_sendmsg logic to avoid blocking on the subflow
socket.

It does so by removing the wait loop from mptcp_sendmsg_frag helper.

In order to do that, it moves prerequisites that are currently
handled in mptcp_sendmsg_frag (and cause it to wait until they are
met, e.g. frag cache refill) into the callers.

The worker can just reschedule in case no subflow socket is ready,
since it can't wait -- doing so would block other work items and
doesn't make sense anyway because we should not (re)send data
in case resources are already low.

The sendmsg path can use the existing wait logic until memory
becomes available.

Because large send requests can result in multiple mptcp_sendmsg_frag
calls from mptcp_sendmsg, we may need to restart the socket lookup in
case subflow can't accept more data or memory is low.

Doing so blocks on the mptcp socket, and existing wait handling
releases the msk lock while blocking.

Lastly, no need to use GFP_ATOMIC for extension allocation:
extend __skb_ext_alloc with gfp_t arg instead of hard-coded ATOMIC and
then relax the allocation constraints for mptcp case: those requests
occur in process context.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents eb682677 4930f483
...@@ -4165,7 +4165,7 @@ struct skb_ext { ...@@ -4165,7 +4165,7 @@ struct skb_ext {
char data[] __aligned(8); char data[] __aligned(8);
}; };
struct skb_ext *__skb_ext_alloc(void); struct skb_ext *__skb_ext_alloc(gfp_t flags);
void *__skb_ext_set(struct sk_buff *skb, enum skb_ext_id id, void *__skb_ext_set(struct sk_buff *skb, enum skb_ext_id id,
struct skb_ext *ext); struct skb_ext *ext);
void *skb_ext_add(struct sk_buff *skb, enum skb_ext_id id); void *skb_ext_add(struct sk_buff *skb, enum skb_ext_id id);
......
...@@ -6087,13 +6087,15 @@ static void *skb_ext_get_ptr(struct skb_ext *ext, enum skb_ext_id id) ...@@ -6087,13 +6087,15 @@ static void *skb_ext_get_ptr(struct skb_ext *ext, enum skb_ext_id id)
/** /**
* __skb_ext_alloc - allocate a new skb extensions storage * __skb_ext_alloc - allocate a new skb extensions storage
* *
* @flags: See kmalloc().
*
* Returns the newly allocated pointer. The pointer can later attached to a * Returns the newly allocated pointer. The pointer can later attached to a
* skb via __skb_ext_set(). * skb via __skb_ext_set().
* Note: caller must handle the skb_ext as an opaque data. * Note: caller must handle the skb_ext as an opaque data.
*/ */
struct skb_ext *__skb_ext_alloc(void) struct skb_ext *__skb_ext_alloc(gfp_t flags)
{ {
struct skb_ext *new = kmem_cache_alloc(skbuff_ext_cache, GFP_ATOMIC); struct skb_ext *new = kmem_cache_alloc(skbuff_ext_cache, flags);
if (new) { if (new) {
memset(new->offset, 0, sizeof(new->offset)); memset(new->offset, 0, sizeof(new->offset));
...@@ -6188,7 +6190,7 @@ void *skb_ext_add(struct sk_buff *skb, enum skb_ext_id id) ...@@ -6188,7 +6190,7 @@ void *skb_ext_add(struct sk_buff *skb, enum skb_ext_id id)
} else { } else {
newoff = SKB_EXT_CHUNKSIZEOF(*new); newoff = SKB_EXT_CHUNKSIZEOF(*new);
new = __skb_ext_alloc(); new = __skb_ext_alloc(GFP_ATOMIC);
if (!new) if (!new)
return NULL; return NULL;
} }
......
...@@ -367,8 +367,10 @@ static void mptcp_stop_timer(struct sock *sk) ...@@ -367,8 +367,10 @@ static void mptcp_stop_timer(struct sock *sk)
static bool mptcp_ext_cache_refill(struct mptcp_sock *msk) static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
{ {
const struct sock *sk = (const struct sock *)msk;
if (!msk->cached_ext) if (!msk->cached_ext)
msk->cached_ext = __skb_ext_alloc(); msk->cached_ext = __skb_ext_alloc(sk->sk_allocation);
return !!msk->cached_ext; return !!msk->cached_ext;
} }
...@@ -510,20 +512,6 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, ...@@ -510,20 +512,6 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
* fooled into a warning if we don't init here * fooled into a warning if we don't init here
*/ */
pfrag = sk_page_frag(sk); pfrag = sk_page_frag(sk);
while ((!retransmission && !mptcp_page_frag_refill(ssk, pfrag)) ||
!mptcp_ext_cache_refill(msk)) {
ret = sk_stream_wait_memory(ssk, timeo);
if (ret)
return ret;
/* if sk_stream_wait_memory() sleeps snd_una can change
* significantly, refresh the rtx queue
*/
mptcp_clean_una(sk);
if (unlikely(__mptcp_needs_tcp_fallback(msk)))
return 0;
}
if (!retransmission) { if (!retransmission) {
write_seq = &msk->write_seq; write_seq = &msk->write_seq;
page = pfrag->page; page = pfrag->page;
...@@ -590,7 +578,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, ...@@ -590,7 +578,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
* access the skb after the sendpages call * access the skb after the sendpages call
*/ */
ret = do_tcp_sendpages(ssk, page, offset, psize, ret = do_tcp_sendpages(ssk, page, offset, psize,
msg->msg_flags | MSG_SENDPAGE_NOTLAST); msg->msg_flags | MSG_SENDPAGE_NOTLAST | MSG_DONTWAIT);
if (ret <= 0) if (ret <= 0)
return ret; return ret;
...@@ -653,6 +641,15 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, ...@@ -653,6 +641,15 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
return ret; return ret;
} }
static void mptcp_nospace(struct mptcp_sock *msk, struct socket *sock)
{
clear_bit(MPTCP_SEND_SPACE, &msk->flags);
smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
/* enables sk->write_space() callbacks */
set_bit(SOCK_NOSPACE, &sock->flags);
}
static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk) static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk)
{ {
struct mptcp_subflow_context *subflow; struct mptcp_subflow_context *subflow;
...@@ -660,19 +657,17 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk) ...@@ -660,19 +657,17 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk)
sock_owned_by_me((const struct sock *)msk); sock_owned_by_me((const struct sock *)msk);
if (!mptcp_ext_cache_refill(msk))
return NULL;
mptcp_for_each_subflow(msk, subflow) { mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow); struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
if (!sk_stream_memory_free(ssk)) { if (!sk_stream_memory_free(ssk)) {
struct socket *sock = ssk->sk_socket; struct socket *sock = ssk->sk_socket;
if (sock) { if (sock)
clear_bit(MPTCP_SEND_SPACE, &msk->flags); mptcp_nospace(msk, sock);
smp_mb__after_atomic();
/* enables sk->write_space() callbacks */
set_bit(SOCK_NOSPACE, &sock->flags);
}
return NULL; return NULL;
} }
...@@ -698,22 +693,19 @@ static void ssk_check_wmem(struct mptcp_sock *msk, struct sock *ssk) ...@@ -698,22 +693,19 @@ static void ssk_check_wmem(struct mptcp_sock *msk, struct sock *ssk)
return; return;
sock = READ_ONCE(ssk->sk_socket); sock = READ_ONCE(ssk->sk_socket);
if (sock)
if (sock) { mptcp_nospace(msk, sock);
clear_bit(MPTCP_SEND_SPACE, &msk->flags);
smp_mb__after_atomic();
/* set NOSPACE only after clearing SEND_SPACE flag */
set_bit(SOCK_NOSPACE, &sock->flags);
}
} }
static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
{ {
int mss_now = 0, size_goal = 0, ret = 0; int mss_now = 0, size_goal = 0, ret = 0;
struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_sock *msk = mptcp_sk(sk);
struct page_frag *pfrag;
struct socket *ssock; struct socket *ssock;
size_t copied = 0; size_t copied = 0;
struct sock *ssk; struct sock *ssk;
bool tx_ok;
long timeo; long timeo;
if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL)) if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL))
...@@ -738,11 +730,29 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) ...@@ -738,11 +730,29 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
return ret >= 0 ? ret + copied : (copied ? copied : ret); return ret >= 0 ? ret + copied : (copied ? copied : ret);
} }
pfrag = sk_page_frag(sk);
restart:
mptcp_clean_una(sk); mptcp_clean_una(sk);
wait_for_sndbuf:
__mptcp_flush_join_list(msk); __mptcp_flush_join_list(msk);
ssk = mptcp_subflow_get_send(msk); ssk = mptcp_subflow_get_send(msk);
while (!sk_stream_memory_free(sk) || !ssk) { while (!sk_stream_memory_free(sk) ||
!ssk ||
!mptcp_page_frag_refill(ssk, pfrag)) {
if (ssk) {
/* make sure retransmit timer is
* running before we wait for memory.
*
* The retransmit timer might be needed
* to make the peer send an up-to-date
* MPTCP Ack.
*/
mptcp_set_timeout(sk, ssk);
if (!mptcp_timer_pending(sk))
mptcp_reset_timer(sk);
}
ret = sk_stream_wait_memory(sk, &timeo); ret = sk_stream_wait_memory(sk, &timeo);
if (ret) if (ret)
goto out; goto out;
...@@ -759,11 +769,18 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) ...@@ -759,11 +769,18 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
pr_debug("conn_list->subflow=%p", ssk); pr_debug("conn_list->subflow=%p", ssk);
lock_sock(ssk); lock_sock(ssk);
while (msg_data_left(msg)) { tx_ok = msg_data_left(msg);
while (tx_ok) {
ret = mptcp_sendmsg_frag(sk, ssk, msg, NULL, &timeo, &mss_now, ret = mptcp_sendmsg_frag(sk, ssk, msg, NULL, &timeo, &mss_now,
&size_goal); &size_goal);
if (ret < 0) if (ret < 0) {
if (ret == -EAGAIN && timeo > 0) {
mptcp_set_timeout(sk, ssk);
release_sock(ssk);
goto restart;
}
break; break;
}
if (ret == 0 && unlikely(__mptcp_needs_tcp_fallback(msk))) { if (ret == 0 && unlikely(__mptcp_needs_tcp_fallback(msk))) {
/* Can happen for passive sockets: /* Can happen for passive sockets:
* 3WHS negotiated MPTCP, but first packet after is * 3WHS negotiated MPTCP, but first packet after is
...@@ -777,6 +794,50 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len) ...@@ -777,6 +794,50 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
} }
copied += ret; copied += ret;
tx_ok = msg_data_left(msg);
if (!tx_ok)
break;
if (!sk_stream_memory_free(ssk) ||
!mptcp_page_frag_refill(ssk, pfrag) ||
!mptcp_ext_cache_refill(msk)) {
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
tcp_push(ssk, msg->msg_flags, mss_now,
tcp_sk(ssk)->nonagle, size_goal);
mptcp_set_timeout(sk, ssk);
release_sock(ssk);
goto restart;
}
/* memory is charged to mptcp level socket as well, i.e.
* if msg is very large, mptcp socket may run out of buffer
* space. mptcp_clean_una() will release data that has
* been acked at mptcp level in the mean time, so there is
* a good chance we can continue sending data right away.
*
* Normally, when the tcp subflow can accept more data, then
* so can the MPTCP socket. However, we need to cope with
* peers that might lag behind in their MPTCP-level
* acknowledgements, i.e. data might have been acked at
* tcp level only. So, we must also check the MPTCP socket
* limits before we send more data.
*/
if (unlikely(!sk_stream_memory_free(sk))) {
tcp_push(ssk, msg->msg_flags, mss_now,
tcp_sk(ssk)->nonagle, size_goal);
mptcp_clean_una(sk);
if (!sk_stream_memory_free(sk)) {
/* can't send more for now, need to wait for
* MPTCP-level ACKs from peer.
*
* Wakeup will happen via mptcp_clean_una().
*/
mptcp_set_timeout(sk, ssk);
release_sock(ssk);
goto wait_for_sndbuf;
}
}
} }
mptcp_set_timeout(sk, ssk); mptcp_set_timeout(sk, ssk);
...@@ -1094,7 +1155,7 @@ static void mptcp_worker(struct work_struct *work) ...@@ -1094,7 +1155,7 @@ static void mptcp_worker(struct work_struct *work)
{ {
struct mptcp_sock *msk = container_of(work, struct mptcp_sock, work); struct mptcp_sock *msk = container_of(work, struct mptcp_sock, work);
struct sock *ssk, *sk = &msk->sk.icsk_inet.sk; struct sock *ssk, *sk = &msk->sk.icsk_inet.sk;
int orig_len, orig_offset, ret, mss_now = 0, size_goal = 0; int orig_len, orig_offset, mss_now = 0, size_goal = 0;
struct mptcp_data_frag *dfrag; struct mptcp_data_frag *dfrag;
u64 orig_write_seq; u64 orig_write_seq;
size_t copied = 0; size_t copied = 0;
...@@ -1116,6 +1177,9 @@ static void mptcp_worker(struct work_struct *work) ...@@ -1116,6 +1177,9 @@ static void mptcp_worker(struct work_struct *work)
if (!dfrag) if (!dfrag)
goto unlock; goto unlock;
if (!mptcp_ext_cache_refill(msk))
goto reset_unlock;
ssk = mptcp_subflow_get_retrans(msk); ssk = mptcp_subflow_get_retrans(msk);
if (!ssk) if (!ssk)
goto reset_unlock; goto reset_unlock;
...@@ -1127,8 +1191,8 @@ static void mptcp_worker(struct work_struct *work) ...@@ -1127,8 +1191,8 @@ static void mptcp_worker(struct work_struct *work)
orig_offset = dfrag->offset; orig_offset = dfrag->offset;
orig_write_seq = dfrag->data_seq; orig_write_seq = dfrag->data_seq;
while (dfrag->data_len > 0) { while (dfrag->data_len > 0) {
ret = mptcp_sendmsg_frag(sk, ssk, &msg, dfrag, &timeo, &mss_now, int ret = mptcp_sendmsg_frag(sk, ssk, &msg, dfrag, &timeo,
&size_goal); &mss_now, &size_goal);
if (ret < 0) if (ret < 0)
break; break;
...@@ -1136,6 +1200,9 @@ static void mptcp_worker(struct work_struct *work) ...@@ -1136,6 +1200,9 @@ static void mptcp_worker(struct work_struct *work)
copied += ret; copied += ret;
dfrag->data_len -= ret; dfrag->data_len -= ret;
dfrag->offset += ret; dfrag->offset += ret;
if (!mptcp_ext_cache_refill(msk))
break;
} }
if (copied) if (copied)
tcp_push(ssk, msg.msg_flags, mss_now, tcp_sk(ssk)->nonagle, tcp_push(ssk, msg.msg_flags, mss_now, tcp_sk(ssk)->nonagle,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment