Commit 365ad353 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: reduce risk of user starvation during link congestion

The socket code currently handles link congestion by either blocking
and trying to send again when the congestion has abated, or just
returning to the user with -EAGAIN and let him re-try later.

This mechanism is prone to starvation, because the wakeup algorithm is
non-atomic. During the time the link issues a wakeup signal, until the
socket wakes up and re-attempts sending, other senders may have come
in between and occupied the free buffer space in the link. This in turn
may lead to a socket having to make many send attempts before it is
successful. In extremely loaded systems we have observed latency times
of several seconds before a low-priority socket is able to send out a
message.

In this commit, we simplify this mechanism and reduce the risk of the
described scenario happening. When a message is attempted sent via a
congested link, we now let it be added to the link's backlog queue
anyway, thus permitting an oversubscription of one message per source
socket. We still create a wakeup item and return an error code, hence
instructing the sender to block or stop sending. Only when enough space
has been freed up in the link's backlog queue do we issue a wakeup event
that allows the sender to continue with the next message, if any.

The fact that a socket now can consider a message sent even when the
link returns a congestion code means that the sending socket code can
be simplified. Also, since this is a good opportunity to get rid of the
obsolete 'mtu change' condition in the three socket send functions, we
now choose to refactor those functions completely.
Signed-off-by: default avatarParthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 4d8642d8
...@@ -174,7 +174,7 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) ...@@ -174,7 +174,7 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
* and to identified node local sockets * and to identified node local sockets
* @net: the applicable net namespace * @net: the applicable net namespace
* @list: chain of buffers containing message * @list: chain of buffers containing message
* Consumes the buffer chain, except when returning -ELINKCONG * Consumes the buffer chain.
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/ */
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
...@@ -197,7 +197,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) ...@@ -197,7 +197,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
/* Don't send to local node if adding to link failed */ /* Don't send to local node if adding to link failed */
if (unlikely(rc)) { if (unlikely(rc && (rc != -ELINKCONG))) {
__skb_queue_purge(&rcvq); __skb_queue_purge(&rcvq);
return rc; return rc;
} }
...@@ -206,7 +206,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) ...@@ -206,7 +206,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
tipc_bcbase_xmit(net, &xmitq); tipc_bcbase_xmit(net, &xmitq);
tipc_sk_mcast_rcv(net, &rcvq, &inputq); tipc_sk_mcast_rcv(net, &rcvq, &inputq);
__skb_queue_purge(list); __skb_queue_purge(list);
return 0; return rc;
} }
/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link /* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
......
...@@ -776,60 +776,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) ...@@ -776,60 +776,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
/** /**
* link_schedule_user - schedule a message sender for wakeup after congestion * link_schedule_user - schedule a message sender for wakeup after congestion
* @link: congested link * @l: congested link
* @list: message that was attempted sent * @hdr: header of message that is being sent
* Create pseudo msg to send back to user when congestion abates * Create pseudo msg to send back to user when congestion abates
* Does not consume buffer list
*/ */
static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
{ {
struct tipc_msg *msg = buf_msg(skb_peek(list)); u32 dnode = tipc_own_addr(l->net);
int imp = msg_importance(msg); u32 dport = msg_origport(hdr);
u32 oport = msg_origport(msg);
u32 addr = tipc_own_addr(link->net);
struct sk_buff *skb; struct sk_buff *skb;
/* This really cannot happen... */
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
return -ENOBUFS;
}
/* Non-blocking sender: */
if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
return -ELINKCONG;
/* Create and schedule wakeup pseudo message */ /* Create and schedule wakeup pseudo message */
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
addr, addr, oport, 0, 0); dnode, l->addr, dport, 0, 0);
if (!skb) if (!skb)
return -ENOBUFS; return -ENOBUFS;
TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); msg_set_dest_droppable(buf_msg(skb), true);
TIPC_SKB_CB(skb)->chain_imp = imp; TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr);
skb_queue_tail(&link->wakeupq, skb); skb_queue_tail(&l->wakeupq, skb);
link->stats.link_congs++; l->stats.link_congs++;
return -ELINKCONG; return -ELINKCONG;
} }
/** /**
* link_prepare_wakeup - prepare users for wakeup after congestion * link_prepare_wakeup - prepare users for wakeup after congestion
* @link: congested link * @l: congested link
* Move a number of waiting users, as permitted by available space in * Wake up a number of waiting users, as permitted by available space
* the send queue, from link wait queue to node wait queue for wakeup * in the send queue
*/ */
void link_prepare_wakeup(struct tipc_link *l) void link_prepare_wakeup(struct tipc_link *l)
{ {
int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
int imp, lim;
struct sk_buff *skb, *tmp; struct sk_buff *skb, *tmp;
int imp, i = 0;
skb_queue_walk_safe(&l->wakeupq, skb, tmp) { skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
imp = TIPC_SKB_CB(skb)->chain_imp; imp = TIPC_SKB_CB(skb)->chain_imp;
lim = l->backlog[imp].limit; if (l->backlog[imp].len < l->backlog[imp].limit) {
pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; skb_unlink(skb, &l->wakeupq);
if ((pnd[imp] + l->backlog[imp].len) >= lim) skb_queue_tail(l->inputq, skb);
} else if (i++ > 10) {
break; break;
skb_unlink(skb, &l->wakeupq); }
skb_queue_tail(l->inputq, skb);
} }
} }
...@@ -869,8 +856,7 @@ void tipc_link_reset(struct tipc_link *l) ...@@ -869,8 +856,7 @@ void tipc_link_reset(struct tipc_link *l)
* @list: chain of buffers containing message * @list: chain of buffers containing message
* @xmitq: returned list of packets to be sent by caller * @xmitq: returned list of packets to be sent by caller
* *
* Consumes the buffer chain, except when returning -ELINKCONG, * Consumes the buffer chain.
* since the caller then may want to make more send attempts.
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/ */
...@@ -879,7 +865,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, ...@@ -879,7 +865,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
{ {
struct tipc_msg *hdr = buf_msg(skb_peek(list)); struct tipc_msg *hdr = buf_msg(skb_peek(list));
unsigned int maxwin = l->window; unsigned int maxwin = l->window;
unsigned int i, imp = msg_importance(hdr); int imp = msg_importance(hdr);
unsigned int mtu = l->mtu; unsigned int mtu = l->mtu;
u16 ack = l->rcv_nxt - 1; u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt; u16 seqno = l->snd_nxt;
...@@ -888,19 +874,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, ...@@ -888,19 +874,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
struct sk_buff_head *backlogq = &l->backlogq; struct sk_buff_head *backlogq = &l->backlogq;
struct sk_buff *skb, *_skb, *bskb; struct sk_buff *skb, *_skb, *bskb;
int pkt_cnt = skb_queue_len(list); int pkt_cnt = skb_queue_len(list);
int rc = 0;
/* Match msg importance against this and all higher backlog limits: */
if (!skb_queue_empty(backlogq)) {
for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
return link_schedule_user(l, list);
}
}
if (unlikely(msg_size(hdr) > mtu)) { if (unlikely(msg_size(hdr) > mtu)) {
skb_queue_purge(list); skb_queue_purge(list);
return -EMSGSIZE; return -EMSGSIZE;
} }
/* Allow oversubscription of one data msg per source at congestion */
if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
if (imp == TIPC_SYSTEM_IMPORTANCE) {
pr_warn("%s<%s>, link overflow", link_rst_msg, l->name);
return -ENOBUFS;
}
rc = link_schedule_user(l, hdr);
}
if (pkt_cnt > 1) { if (pkt_cnt > 1) {
l->stats.sent_fragmented++; l->stats.sent_fragmented++;
l->stats.sent_fragments += pkt_cnt; l->stats.sent_fragments += pkt_cnt;
...@@ -946,7 +935,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, ...@@ -946,7 +935,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
skb_queue_splice_tail_init(list, backlogq); skb_queue_splice_tail_init(list, backlogq);
} }
l->snd_nxt = seqno; l->snd_nxt = seqno;
return 0; return rc;
} }
void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
......
...@@ -98,8 +98,6 @@ struct tipc_skb_cb { ...@@ -98,8 +98,6 @@ struct tipc_skb_cb {
u32 bytes_read; u32 bytes_read;
struct sk_buff *tail; struct sk_buff *tail;
bool validated; bool validated;
bool wakeup_pending;
u16 chain_sz;
u16 chain_imp; u16 chain_imp;
u16 ackers; u16 ackers;
}; };
......
...@@ -1167,7 +1167,7 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) ...@@ -1167,7 +1167,7 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
* @list: chain of buffers containing message * @list: chain of buffers containing message
* @dnode: address of destination node * @dnode: address of destination node
* @selector: a number used for deterministic link selection * @selector: a number used for deterministic link selection
* Consumes the buffer chain, except when returning -ELINKCONG * Consumes the buffer chain.
* Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF
*/ */
int tipc_node_xmit(struct net *net, struct sk_buff_head *list, int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
...@@ -1206,10 +1206,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, ...@@ -1206,10 +1206,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
spin_unlock_bh(&le->lock); spin_unlock_bh(&le->lock);
tipc_node_read_unlock(n); tipc_node_read_unlock(n);
if (likely(rc == 0)) if (unlikely(rc == -ENOBUFS))
tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
else if (rc == -ENOBUFS)
tipc_node_link_down(n, bearer_id, false); tipc_node_link_down(n, bearer_id, false);
else
tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
tipc_node_put(n); tipc_node_put(n);
...@@ -1221,20 +1221,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, ...@@ -1221,20 +1221,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
* messages, which will not be rejected * messages, which will not be rejected
* The only exception is datagram messages rerouted after secondary * The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway. * lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
* tipc_wait_for_sendpkt() where applicable
*/ */
int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
u32 selector) u32 selector)
{ {
struct sk_buff_head head; struct sk_buff_head head;
int rc;
skb_queue_head_init(&head); skb_queue_head_init(&head);
__skb_queue_tail(&head, skb); __skb_queue_tail(&head, skb);
rc = tipc_node_xmit(net, &head, dnode, selector); tipc_node_xmit(net, &head, dnode, selector);
if (rc == -ELINKCONG)
kfree_skb(skb);
return 0; return 0;
} }
......
...@@ -67,12 +67,14 @@ enum { ...@@ -67,12 +67,14 @@ enum {
* @max_pkt: maximum packet size "hint" used when building messages sent by port * @max_pkt: maximum packet size "hint" used when building messages sent by port
* @portid: unique port identity in TIPC socket hash table * @portid: unique port identity in TIPC socket hash table
* @phdr: preformatted message header used when sending messages * @phdr: preformatted message header used when sending messages
* #cong_links: list of congested links
* @publications: list of publications for port * @publications: list of publications for port
* @blocking_link: address of the congested link we are currently sleeping on
* @pub_count: total # of publications port has made during its lifetime * @pub_count: total # of publications port has made during its lifetime
* @probing_state: * @probing_state:
* @conn_timeout: the time we can wait for an unresponded setup request * @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
* @link_cong: non-zero if owner must sleep because of link congestion * @cong_link_cnt: number of congested links
* @sent_unacked: # messages sent by socket, and not yet acked by peer * @sent_unacked: # messages sent by socket, and not yet acked by peer
* @rcv_unacked: # messages read by user, but not yet acked back to peer * @rcv_unacked: # messages read by user, but not yet acked back to peer
* @peer: 'connected' peer for dgram/rdm * @peer: 'connected' peer for dgram/rdm
...@@ -87,13 +89,13 @@ struct tipc_sock { ...@@ -87,13 +89,13 @@ struct tipc_sock {
u32 max_pkt; u32 max_pkt;
u32 portid; u32 portid;
struct tipc_msg phdr; struct tipc_msg phdr;
struct list_head sock_list; struct list_head cong_links;
struct list_head publications; struct list_head publications;
u32 pub_count; u32 pub_count;
uint conn_timeout; uint conn_timeout;
atomic_t dupl_rcvcnt; atomic_t dupl_rcvcnt;
bool probe_unacked; bool probe_unacked;
bool link_cong; u16 cong_link_cnt;
u16 snt_unacked; u16 snt_unacked;
u16 snd_win; u16 snd_win;
u16 peer_caps; u16 peer_caps;
...@@ -118,8 +120,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, ...@@ -118,8 +120,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
static int tipc_sk_insert(struct tipc_sock *tsk); static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk);
static int __tipc_send_stream(struct socket *sock, struct msghdr *m, static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
size_t dsz);
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
static const struct proto_ops packet_ops; static const struct proto_ops packet_ops;
...@@ -424,6 +425,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, ...@@ -424,6 +425,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk = tipc_sk(sk); tsk = tipc_sk(sk);
tsk->max_pkt = MAX_PKT_DEFAULT; tsk->max_pkt = MAX_PKT_DEFAULT;
INIT_LIST_HEAD(&tsk->publications); INIT_LIST_HEAD(&tsk->publications);
INIT_LIST_HEAD(&tsk->cong_links);
msg = &tsk->phdr; msg = &tsk->phdr;
tn = net_generic(sock_net(sk), tipc_net_id); tn = net_generic(sock_net(sk), tipc_net_id);
tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
...@@ -474,9 +476,14 @@ static void __tipc_shutdown(struct socket *sock, int error) ...@@ -474,9 +476,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
long timeout = CONN_TIMEOUT_DEFAULT;
u32 dnode = tsk_peer_node(tsk); u32 dnode = tsk_peer_node(tsk);
struct sk_buff *skb; struct sk_buff *skb;
/* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
!tsk_conn_cong(tsk)));
/* Reject all unreceived messages, except on an active connection /* Reject all unreceived messages, except on an active connection
* (which disconnects locally & sends a 'FIN+' to peer). * (which disconnects locally & sends a 'FIN+' to peer).
*/ */
...@@ -547,7 +554,8 @@ static int tipc_release(struct socket *sock) ...@@ -547,7 +554,8 @@ static int tipc_release(struct socket *sock)
/* Reject any messages that accumulated in backlog queue */ /* Reject any messages that accumulated in backlog queue */
release_sock(sk); release_sock(sk);
u32_list_purge(&tsk->cong_links);
tsk->cong_link_cnt = 0;
call_rcu(&tsk->rcu, tipc_sk_callback); call_rcu(&tsk->rcu, tipc_sk_callback);
sock->sk = NULL; sock->sk = NULL;
...@@ -690,7 +698,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, ...@@ -690,7 +698,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
switch (sk->sk_state) { switch (sk->sk_state) {
case TIPC_ESTABLISHED: case TIPC_ESTABLISHED:
if (!tsk->link_cong && !tsk_conn_cong(tsk)) if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
mask |= POLLOUT; mask |= POLLOUT;
/* fall thru' */ /* fall thru' */
case TIPC_LISTEN: case TIPC_LISTEN:
...@@ -699,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, ...@@ -699,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
mask |= (POLLIN | POLLRDNORM); mask |= (POLLIN | POLLRDNORM);
break; break;
case TIPC_OPEN: case TIPC_OPEN:
if (!tsk->link_cong) if (!tsk->cong_link_cnt)
mask |= POLLOUT; mask |= POLLOUT;
if (tipc_sk_type_connectionless(sk) && if (tipc_sk_type_connectionless(sk) &&
(!skb_queue_empty(&sk->sk_receive_queue))) (!skb_queue_empty(&sk->sk_receive_queue)))
...@@ -718,63 +726,48 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, ...@@ -718,63 +726,48 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
* @sock: socket structure * @sock: socket structure
* @seq: destination address * @seq: destination address
* @msg: message to send * @msg: message to send
* @dsz: total length of message data * @dlen: length of data to send
* @timeo: timeout to wait for wakeup * @timeout: timeout to wait for wakeup
* *
* Called from function tipc_sendmsg(), which has done all sanity checks * Called from function tipc_sendmsg(), which has done all sanity checks
* Returns the number of bytes sent on success, or errno * Returns the number of bytes sent on success, or errno
*/ */
static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
struct msghdr *msg, size_t dsz, long timeo) struct msghdr *msg, size_t dlen, long timeout)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
struct tipc_msg *mhdr = &tsk->phdr; int mtu = tipc_bcast_get_mtu(net);
struct sk_buff_head pktchain; struct sk_buff_head pkts;
struct iov_iter save = msg->msg_iter;
uint mtu;
int rc; int rc;
if (!timeo && tsk->link_cong) rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
return -ELINKCONG; if (unlikely(rc))
return rc;
msg_set_type(mhdr, TIPC_MCAST_MSG);
msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
msg_set_destport(mhdr, 0);
msg_set_destnode(mhdr, 0);
msg_set_nametype(mhdr, seq->type);
msg_set_namelower(mhdr, seq->lower);
msg_set_nameupper(mhdr, seq->upper);
msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
skb_queue_head_init(&pktchain);
new_mtu: msg_set_type(hdr, TIPC_MCAST_MSG);
mtu = tipc_bcast_get_mtu(net); msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); msg_set_destport(hdr, 0);
if (unlikely(rc < 0)) msg_set_destnode(hdr, 0);
msg_set_nametype(hdr, seq->type);
msg_set_namelower(hdr, seq->lower);
msg_set_nameupper(hdr, seq->upper);
msg_set_hdr_sz(hdr, MCAST_H_SIZE);
skb_queue_head_init(&pkts);
rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
if (unlikely(rc != dlen))
return rc; return rc;
do { rc = tipc_bcast_xmit(net, &pkts);
rc = tipc_bcast_xmit(net, &pktchain); if (unlikely(rc == -ELINKCONG)) {
if (likely(!rc)) tsk->cong_link_cnt = 1;
return dsz; rc = 0;
}
if (rc == -ELINKCONG) {
tsk->link_cong = 1; return rc ? rc : dlen;
rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong);
if (!rc)
continue;
}
__skb_queue_purge(&pktchain);
if (rc == -EMSGSIZE) {
msg->msg_iter = save;
goto new_mtu;
}
break;
} while (1);
return rc;
} }
/** /**
...@@ -898,35 +891,38 @@ static int tipc_sendmsg(struct socket *sock, ...@@ -898,35 +891,38 @@ static int tipc_sendmsg(struct socket *sock,
return ret; return ret;
} }
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
{ {
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
struct tipc_msg *mhdr = &tsk->phdr; struct tipc_sock *tsk = tipc_sk(sk);
u32 dnode, dport; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct sk_buff_head pktchain; long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
bool is_connectionless = tipc_sk_type_connectionless(sk); struct list_head *clinks = &tsk->cong_links;
struct sk_buff *skb; bool syn = !tipc_sk_type_connectionless(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct tipc_name_seq *seq; struct tipc_name_seq *seq;
struct iov_iter save; struct sk_buff_head pkts;
u32 mtu; u32 type, inst, domain;
long timeo; u32 dnode, dport;
int rc; int mtu, rc;
if (dsz > TIPC_MAX_USER_MSG_SIZE) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
return -EMSGSIZE; return -EMSGSIZE;
if (unlikely(!dest)) { if (unlikely(!dest)) {
if (is_connectionless && tsk->peer.family == AF_TIPC) dest = &tsk->peer;
dest = &tsk->peer; if (!syn || dest->family != AF_TIPC)
else
return -EDESTADDRREQ; return -EDESTADDRREQ;
} else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
dest->family != AF_TIPC) {
return -EINVAL;
} }
if (!is_connectionless) {
if (unlikely(m->msg_namelen < sizeof(*dest)))
return -EINVAL;
if (unlikely(dest->family != AF_TIPC))
return -EINVAL;
if (unlikely(syn)) {
if (sk->sk_state == TIPC_LISTEN) if (sk->sk_state == TIPC_LISTEN)
return -EPIPE; return -EPIPE;
if (sk->sk_state != TIPC_OPEN) if (sk->sk_state != TIPC_OPEN)
...@@ -938,72 +934,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) ...@@ -938,72 +934,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
tsk->conn_instance = dest->addr.name.name.instance; tsk->conn_instance = dest->addr.name.name.instance;
} }
} }
seq = &dest->addr.nameseq;
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
if (dest->addrtype == TIPC_ADDR_MCAST) { seq = &dest->addr.nameseq;
return tipc_sendmcast(sock, seq, m, dsz, timeo); if (dest->addrtype == TIPC_ADDR_MCAST)
} else if (dest->addrtype == TIPC_ADDR_NAME) { return tipc_sendmcast(sock, seq, m, dlen, timeout);
u32 type = dest->addr.name.name.type;
u32 inst = dest->addr.name.name.instance;
u32 domain = dest->addr.name.domain;
if (dest->addrtype == TIPC_ADDR_NAME) {
type = dest->addr.name.name.type;
inst = dest->addr.name.name.instance;
domain = dest->addr.name.domain;
dnode = domain; dnode = domain;
msg_set_type(mhdr, TIPC_NAMED_MSG); msg_set_type(hdr, TIPC_NAMED_MSG);
msg_set_hdr_sz(mhdr, NAMED_H_SIZE); msg_set_hdr_sz(hdr, NAMED_H_SIZE);
msg_set_nametype(mhdr, type); msg_set_nametype(hdr, type);
msg_set_nameinst(mhdr, inst); msg_set_nameinst(hdr, inst);
msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
dport = tipc_nametbl_translate(net, type, inst, &dnode); dport = tipc_nametbl_translate(net, type, inst, &dnode);
msg_set_destnode(mhdr, dnode); msg_set_destnode(hdr, dnode);
msg_set_destport(mhdr, dport); msg_set_destport(hdr, dport);
if (unlikely(!dport && !dnode)) if (unlikely(!dport && !dnode))
return -EHOSTUNREACH; return -EHOSTUNREACH;
} else if (dest->addrtype == TIPC_ADDR_ID) { } else if (dest->addrtype == TIPC_ADDR_ID) {
dnode = dest->addr.id.node; dnode = dest->addr.id.node;
msg_set_type(mhdr, TIPC_DIRECT_MSG); msg_set_type(hdr, TIPC_DIRECT_MSG);
msg_set_lookup_scope(mhdr, 0); msg_set_lookup_scope(hdr, 0);
msg_set_destnode(mhdr, dnode); msg_set_destnode(hdr, dnode);
msg_set_destport(mhdr, dest->addr.id.ref); msg_set_destport(hdr, dest->addr.id.ref);
msg_set_hdr_sz(mhdr, BASIC_H_SIZE); msg_set_hdr_sz(hdr, BASIC_H_SIZE);
} }
skb_queue_head_init(&pktchain); /* Block or return if destination link is congested */
save = m->msg_iter; rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
new_mtu: if (unlikely(rc))
return rc;
skb_queue_head_init(&pkts);
mtu = tipc_node_get_mtu(net, dnode, tsk->portid); mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain); rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
if (rc < 0) if (unlikely(rc != dlen))
return rc; return rc;
do { rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
skb = skb_peek(&pktchain); if (unlikely(rc == -ELINKCONG)) {
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; u32_push(clinks, dnode);
rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); tsk->cong_link_cnt++;
if (likely(!rc)) { rc = 0;
if (!is_connectionless) }
tipc_set_sk_state(sk, TIPC_CONNECTING);
return dsz;
}
if (rc == -ELINKCONG) {
tsk->link_cong = 1;
rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong);
if (!rc)
continue;
}
__skb_queue_purge(&pktchain);
if (rc == -EMSGSIZE) {
m->msg_iter = save;
goto new_mtu;
}
break;
} while (1);
return rc; if (unlikely(syn && !rc))
tipc_set_sk_state(sk, TIPC_CONNECTING);
return rc ? rc : dlen;
} }
/** /**
* tipc_send_stream - send stream-oriented data * tipc_sendstream - send stream-oriented data
* @sock: socket structure * @sock: socket structure
* @m: data to send * @m: data to send
* @dsz: total length of data to be transmitted * @dsz: total length of data to be transmitted
...@@ -1013,97 +999,69 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) ...@@ -1013,97 +999,69 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
* Returns the number of bytes sent on success (or partial success), * Returns the number of bytes sent on success (or partial success),
* or errno if no data sent * or errno if no data sent
*/ */
static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
int ret; int ret;
lock_sock(sk); lock_sock(sk);
ret = __tipc_send_stream(sock, m, dsz); ret = __tipc_sendstream(sock, m, dsz);
release_sock(sk); release_sock(sk);
return ret; return ret;
} }
static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct net *net = sock_net(sk);
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *mhdr = &tsk->phdr;
struct sk_buff_head pktchain;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
u32 portid = tsk->portid; long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
int rc = -EINVAL; struct tipc_sock *tsk = tipc_sk(sk);
long timeo; struct tipc_msg *hdr = &tsk->phdr;
u32 dnode; struct net *net = sock_net(sk);
uint mtu, send, sent = 0; struct sk_buff_head pkts;
struct iov_iter save; u32 dnode = tsk_peer_node(tsk);
int hlen = MIN_H_SIZE; int send, sent = 0;
int rc = 0;
/* Handle implied connection establishment */
if (unlikely(dest)) {
rc = __tipc_sendmsg(sock, m, dsz);
hlen = msg_hdr_sz(mhdr);
if (dsz && (dsz == rc))
tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
return rc;
}
if (dsz > (uint)INT_MAX)
return -EMSGSIZE;
if (unlikely(!tipc_sk_connected(sk))) {
if (sk->sk_state == TIPC_DISCONNECTING)
return -EPIPE;
else
return -ENOTCONN;
}
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); skb_queue_head_init(&pkts);
if (!timeo && tsk->link_cong)
return -ELINKCONG;
dnode = tsk_peer_node(tsk); if (unlikely(dlen > INT_MAX))
skb_queue_head_init(&pktchain); return -EMSGSIZE;
next: /* Handle implicit connection setup */
save = m->msg_iter; if (unlikely(dest)) {
mtu = tsk->max_pkt; rc = __tipc_sendmsg(sock, m, dlen);
send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); if (dlen && (dlen == rc))
rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
if (unlikely(rc < 0))
return rc; return rc;
}
do { do {
if (likely(!tsk_conn_cong(tsk))) { rc = tipc_wait_for_cond(sock, &timeout,
rc = tipc_node_xmit(net, &pktchain, dnode, portid); (!tsk->cong_link_cnt &&
if (likely(!rc)) {
tsk->snt_unacked += tsk_inc(tsk, send + hlen);
sent += send;
if (sent == dsz)
return dsz;
goto next;
}
if (rc == -EMSGSIZE) {
__skb_queue_purge(&pktchain);
tsk->max_pkt = tipc_node_get_mtu(net, dnode,
portid);
m->msg_iter = save;
goto next;
}
if (rc != -ELINKCONG)
break;
tsk->link_cong = 1;
}
rc = tipc_wait_for_cond(sock, &timeo,
(!tsk->link_cong &&
!tsk_conn_cong(tsk) && !tsk_conn_cong(tsk) &&
tipc_sk_connected(sk))); tipc_sk_connected(sk)));
} while (!rc); if (unlikely(rc))
break;
send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
if (unlikely(rc != send))
break;
rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
if (unlikely(rc == -ELINKCONG)) {
tsk->cong_link_cnt = 1;
rc = 0;
}
if (likely(!rc)) {
tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
sent += send;
}
} while (sent < dlen && !rc);
__skb_queue_purge(&pktchain); return rc ? rc : sent;
return sent ? sent : rc;
} }
/** /**
...@@ -1121,7 +1079,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz) ...@@ -1121,7 +1079,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
if (dsz > TIPC_MAX_USER_MSG_SIZE) if (dsz > TIPC_MAX_USER_MSG_SIZE)
return -EMSGSIZE; return -EMSGSIZE;
return tipc_send_stream(sock, m, dsz); return tipc_sendstream(sock, m, dsz);
} }
/* tipc_sk_finish_conn - complete the setup of a connection /* tipc_sk_finish_conn - complete the setup of a connection
...@@ -1688,6 +1646,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, ...@@ -1688,6 +1646,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
unsigned int limit = rcvbuf_limit(sk, skb); unsigned int limit = rcvbuf_limit(sk, skb);
int err = TIPC_OK; int err = TIPC_OK;
int usr = msg_user(hdr); int usr = msg_user(hdr);
u32 onode;
if (unlikely(msg_user(hdr) == CONN_MANAGER)) { if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
tipc_sk_proto_rcv(tsk, skb, xmitq); tipc_sk_proto_rcv(tsk, skb, xmitq);
...@@ -1695,8 +1654,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, ...@@ -1695,8 +1654,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
} }
if (unlikely(usr == SOCK_WAKEUP)) { if (unlikely(usr == SOCK_WAKEUP)) {
onode = msg_orignode(hdr);
kfree_skb(skb); kfree_skb(skb);
tsk->link_cong = 0; u32_del(&tsk->cong_links, onode);
tsk->cong_link_cnt--;
sk->sk_write_space(sk); sk->sk_write_space(sk);
return false; return false;
} }
...@@ -2104,7 +2065,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) ...@@ -2104,7 +2065,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
struct msghdr m = {NULL,}; struct msghdr m = {NULL,};
tsk_advance_rx_queue(sk); tsk_advance_rx_queue(sk);
__tipc_send_stream(new_sock, &m, 0); __tipc_sendstream(new_sock, &m, 0);
} else { } else {
__skb_dequeue(&sk->sk_receive_queue); __skb_dequeue(&sk->sk_receive_queue);
__skb_queue_head(&new_sk->sk_receive_queue, buf); __skb_queue_head(&new_sk->sk_receive_queue, buf);
...@@ -2565,7 +2526,7 @@ static const struct proto_ops stream_ops = { ...@@ -2565,7 +2526,7 @@ static const struct proto_ops stream_ops = {
.shutdown = tipc_shutdown, .shutdown = tipc_shutdown,
.setsockopt = tipc_setsockopt, .setsockopt = tipc_setsockopt,
.getsockopt = tipc_getsockopt, .getsockopt = tipc_getsockopt,
.sendmsg = tipc_send_stream, .sendmsg = tipc_sendstream,
.recvmsg = tipc_recv_stream, .recvmsg = tipc_recv_stream,
.mmap = sock_no_mmap, .mmap = sock_no_mmap,
.sendpage = sock_no_sendpage .sendpage = sock_no_sendpage
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment