Commit a88eb6be authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-link-starvation'

Jon Maloy says:

====================
tipc: improve interaction socket-link

We fix a very real starvation problem that may occur when a link
encounters send buffer congestion. At the same time we make the
interaction between the socket and link layer simpler and more
consistent.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents aa276dd7 365ad353
......@@ -174,7 +174,7 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
* and to identified node local sockets
* @net: the applicable net namespace
* @list: chain of buffers containing message
* Consumes the buffer chain, except when returning -ELINKCONG
* Consumes the buffer chain.
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
......@@ -197,7 +197,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
tipc_bcast_unlock(net);
/* Don't send to local node if adding to link failed */
if (unlikely(rc)) {
if (unlikely(rc && (rc != -ELINKCONG))) {
__skb_queue_purge(&rcvq);
return rc;
}
......@@ -206,7 +206,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
tipc_bcbase_xmit(net, &xmitq);
tipc_sk_mcast_rcv(net, &rcvq, &inputq);
__skb_queue_purge(list);
return 0;
return rc;
}
/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
......
......@@ -776,60 +776,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
/**
* link_schedule_user - schedule a message sender for wakeup after congestion
* @link: congested link
* @list: message that was attempted sent
* @l: congested link
* @hdr: header of message that is being sent
* Create pseudo msg to send back to user when congestion abates
* Does not consume buffer list
*/
static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
{
struct tipc_msg *msg = buf_msg(skb_peek(list));
int imp = msg_importance(msg);
u32 oport = msg_origport(msg);
u32 addr = tipc_own_addr(link->net);
u32 dnode = tipc_own_addr(l->net);
u32 dport = msg_origport(hdr);
struct sk_buff *skb;
/* This really cannot happen... */
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
return -ENOBUFS;
}
/* Non-blocking sender: */
if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
return -ELINKCONG;
/* Create and schedule wakeup pseudo message */
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
addr, addr, oport, 0, 0);
dnode, l->addr, dport, 0, 0);
if (!skb)
return -ENOBUFS;
TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
TIPC_SKB_CB(skb)->chain_imp = imp;
skb_queue_tail(&link->wakeupq, skb);
link->stats.link_congs++;
msg_set_dest_droppable(buf_msg(skb), true);
TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr);
skb_queue_tail(&l->wakeupq, skb);
l->stats.link_congs++;
return -ELINKCONG;
}
/**
* link_prepare_wakeup - prepare users for wakeup after congestion
* @link: congested link
* Move a number of waiting users, as permitted by available space in
* the send queue, from link wait queue to node wait queue for wakeup
* @l: congested link
* Wake up a number of waiting users, as permitted by available space
* in the send queue
*/
void link_prepare_wakeup(struct tipc_link *l)
{
int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
int imp, lim;
struct sk_buff *skb, *tmp;
int imp, i = 0;
skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
imp = TIPC_SKB_CB(skb)->chain_imp;
lim = l->backlog[imp].limit;
pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
if ((pnd[imp] + l->backlog[imp].len) >= lim)
if (l->backlog[imp].len < l->backlog[imp].limit) {
skb_unlink(skb, &l->wakeupq);
skb_queue_tail(l->inputq, skb);
} else if (i++ > 10) {
break;
skb_unlink(skb, &l->wakeupq);
skb_queue_tail(l->inputq, skb);
}
}
}
......@@ -869,8 +856,7 @@ void tipc_link_reset(struct tipc_link *l)
* @list: chain of buffers containing message
* @xmitq: returned list of packets to be sent by caller
*
* Consumes the buffer chain, except when returning -ELINKCONG,
* since the caller then may want to make more send attempts.
* Consumes the buffer chain.
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
......@@ -879,7 +865,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
{
struct tipc_msg *hdr = buf_msg(skb_peek(list));
unsigned int maxwin = l->window;
unsigned int i, imp = msg_importance(hdr);
int imp = msg_importance(hdr);
unsigned int mtu = l->mtu;
u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt;
......@@ -888,19 +874,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
struct sk_buff_head *backlogq = &l->backlogq;
struct sk_buff *skb, *_skb, *bskb;
int pkt_cnt = skb_queue_len(list);
int rc = 0;
/* Match msg importance against this and all higher backlog limits: */
if (!skb_queue_empty(backlogq)) {
for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
return link_schedule_user(l, list);
}
}
if (unlikely(msg_size(hdr) > mtu)) {
skb_queue_purge(list);
return -EMSGSIZE;
}
/* Allow oversubscription of one data msg per source at congestion */
if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
if (imp == TIPC_SYSTEM_IMPORTANCE) {
pr_warn("%s<%s>, link overflow", link_rst_msg, l->name);
return -ENOBUFS;
}
rc = link_schedule_user(l, hdr);
}
if (pkt_cnt > 1) {
l->stats.sent_fragmented++;
l->stats.sent_fragments += pkt_cnt;
......@@ -946,7 +935,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
skb_queue_splice_tail_init(list, backlogq);
}
l->snd_nxt = seqno;
return 0;
return rc;
}
void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
......
......@@ -98,8 +98,6 @@ struct tipc_skb_cb {
u32 bytes_read;
struct sk_buff *tail;
bool validated;
bool wakeup_pending;
u16 chain_sz;
u16 chain_imp;
u16 ackers;
};
......
......@@ -608,7 +608,7 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
* Returns non-zero if any off-node ports overlap
*/
int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
u32 limit, struct tipc_plist *dports)
u32 limit, struct list_head *dports)
{
struct name_seq *seq;
struct sub_seq *sseq;
......@@ -633,7 +633,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
info = sseq->info;
list_for_each_entry(publ, &info->node_list, node_list) {
if (publ->scope <= limit)
tipc_plist_push(dports, publ->ref);
u32_push(dports, publ->ref);
}
if (info->cluster_list_size != info->node_list_size)
......@@ -1022,40 +1022,84 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
return skb->len;
}
void tipc_plist_push(struct tipc_plist *pl, u32 port)
struct u32_item {
struct list_head list;
u32 value;
};
bool u32_find(struct list_head *l, u32 value)
{
struct tipc_plist *nl;
struct u32_item *item;
if (likely(!pl->port)) {
pl->port = port;
return;
list_for_each_entry(item, l, list) {
if (item->value == value)
return true;
}
if (pl->port == port)
return;
list_for_each_entry(nl, &pl->list, list) {
if (nl->port == port)
return;
return false;
}
bool u32_push(struct list_head *l, u32 value)
{
struct u32_item *item;
list_for_each_entry(item, l, list) {
if (item->value == value)
return false;
}
item = kmalloc(sizeof(*item), GFP_ATOMIC);
if (unlikely(!item))
return false;
item->value = value;
list_add(&item->list, l);
return true;
}
u32 u32_pop(struct list_head *l)
{
struct u32_item *item;
u32 value = 0;
if (list_empty(l))
return 0;
item = list_first_entry(l, typeof(*item), list);
value = item->value;
list_del(&item->list);
kfree(item);
return value;
}
bool u32_del(struct list_head *l, u32 value)
{
struct u32_item *item, *tmp;
list_for_each_entry_safe(item, tmp, l, list) {
if (item->value != value)
continue;
list_del(&item->list);
kfree(item);
return true;
}
nl = kmalloc(sizeof(*nl), GFP_ATOMIC);
if (nl) {
nl->port = port;
list_add(&nl->list, &pl->list);
return false;
}
void u32_list_purge(struct list_head *l)
{
struct u32_item *item, *tmp;
list_for_each_entry_safe(item, tmp, l, list) {
list_del(&item->list);
kfree(item);
}
}
u32 tipc_plist_pop(struct tipc_plist *pl)
int u32_list_len(struct list_head *l)
{
struct tipc_plist *nl;
u32 port = 0;
struct u32_item *item;
int i = 0;
if (likely(list_empty(&pl->list))) {
port = pl->port;
pl->port = 0;
return port;
list_for_each_entry(item, l, list) {
i++;
}
nl = list_first_entry(&pl->list, typeof(*nl), list);
port = nl->port;
list_del(&nl->list);
kfree(nl);
return port;
return i;
}
......@@ -99,7 +99,7 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
u32 limit, struct tipc_plist *dports);
u32 limit, struct list_head *dports);
struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
u32 upper, u32 scope, u32 port_ref,
u32 key);
......@@ -116,18 +116,11 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
int tipc_nametbl_init(struct net *net);
void tipc_nametbl_stop(struct net *net);
struct tipc_plist {
struct list_head list;
u32 port;
};
static inline void tipc_plist_init(struct tipc_plist *pl)
{
INIT_LIST_HEAD(&pl->list);
pl->port = 0;
}
void tipc_plist_push(struct tipc_plist *pl, u32 port);
u32 tipc_plist_pop(struct tipc_plist *pl);
bool u32_push(struct list_head *l, u32 value);
u32 u32_pop(struct list_head *l);
bool u32_find(struct list_head *l, u32 value);
bool u32_del(struct list_head *l, u32 value);
void u32_list_purge(struct list_head *l);
int u32_list_len(struct list_head *l);
#endif
......@@ -1167,7 +1167,7 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
* @list: chain of buffers containing message
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
* Consumes the buffer chain, except when returning -ELINKCONG
* Consumes the buffer chain.
* Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF
*/
int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
......@@ -1206,10 +1206,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
spin_unlock_bh(&le->lock);
tipc_node_read_unlock(n);
if (likely(rc == 0))
tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
else if (rc == -ENOBUFS)
if (unlikely(rc == -ENOBUFS))
tipc_node_link_down(n, bearer_id, false);
else
tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
tipc_node_put(n);
......@@ -1221,20 +1221,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
* messages, which will not be rejected
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
* tipc_wait_for_sendpkt() where applicable
*/
int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
u32 selector)
{
struct sk_buff_head head;
int rc;
skb_queue_head_init(&head);
__skb_queue_tail(&head, skb);
rc = tipc_node_xmit(net, &head, dnode, selector);
if (rc == -ELINKCONG)
kfree_skb(skb);
tipc_node_xmit(net, &head, dnode, selector);
return 0;
}
......
......@@ -67,12 +67,14 @@ enum {
* @max_pkt: maximum packet size "hint" used when building messages sent by port
* @portid: unique port identity in TIPC socket hash table
* @phdr: preformatted message header used when sending messages
* #cong_links: list of congested links
* @publications: list of publications for port
* @blocking_link: address of the congested link we are currently sleeping on
* @pub_count: total # of publications port has made during its lifetime
* @probing_state:
* @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
* @link_cong: non-zero if owner must sleep because of link congestion
* @cong_link_cnt: number of congested links
* @sent_unacked: # messages sent by socket, and not yet acked by peer
* @rcv_unacked: # messages read by user, but not yet acked back to peer
* @peer: 'connected' peer for dgram/rdm
......@@ -87,13 +89,13 @@ struct tipc_sock {
u32 max_pkt;
u32 portid;
struct tipc_msg phdr;
struct list_head sock_list;
struct list_head cong_links;
struct list_head publications;
u32 pub_count;
uint conn_timeout;
atomic_t dupl_rcvcnt;
bool probe_unacked;
bool link_cong;
u16 cong_link_cnt;
u16 snt_unacked;
u16 snd_win;
u16 peer_caps;
......@@ -110,7 +112,6 @@ static void tipc_write_space(struct sock *sk);
static void tipc_sock_destruct(struct sock *sk);
static int tipc_release(struct socket *sock);
static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
static void tipc_sk_timeout(unsigned long data);
static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
struct tipc_name_seq const *seq);
......@@ -119,8 +120,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
size_t dsz);
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
static const struct proto_ops packet_ops;
......@@ -334,6 +334,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)
return res;
}
static int tipc_sk_sock_err(struct socket *sock, long *timeout)
{
struct sock *sk = sock->sk;
int err = sock_error(sk);
int typ = sock->type;
if (err)
return err;
if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
if (sk->sk_state == TIPC_DISCONNECTING)
return -EPIPE;
else if (!tipc_sk_connected(sk))
return -ENOTCONN;
}
if (!*timeout)
return -EAGAIN;
if (signal_pending(current))
return sock_intr_errno(*timeout);
return 0;
}
#define tipc_wait_for_cond(sock_, timeout_, condition_) \
({ \
int rc_ = 0; \
int done_ = 0; \
\
while (!(condition_) && !done_) { \
struct sock *sk_ = sock->sk; \
DEFINE_WAIT_FUNC(wait_, woken_wake_function); \
\
rc_ = tipc_sk_sock_err(sock_, timeout_); \
if (rc_) \
break; \
prepare_to_wait(sk_sleep(sk_), &wait_, \
TASK_INTERRUPTIBLE); \
done_ = sk_wait_event(sk_, timeout_, \
(condition_), &wait_); \
remove_wait_queue(sk_sleep(sk_), &wait_); \
} \
rc_; \
})
/**
* tipc_sk_create - create a TIPC socket
* @net: network namespace (must be default network)
......@@ -382,6 +425,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk = tipc_sk(sk);
tsk->max_pkt = MAX_PKT_DEFAULT;
INIT_LIST_HEAD(&tsk->publications);
INIT_LIST_HEAD(&tsk->cong_links);
msg = &tsk->phdr;
tn = net_generic(sock_net(sk), tipc_net_id);
tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
......@@ -432,9 +476,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk);
long timeout = CONN_TIMEOUT_DEFAULT;
u32 dnode = tsk_peer_node(tsk);
struct sk_buff *skb;
/* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
!tsk_conn_cong(tsk)));
/* Reject all unreceived messages, except on an active connection
* (which disconnects locally & sends a 'FIN+' to peer).
*/
......@@ -505,7 +554,8 @@ static int tipc_release(struct socket *sock)
/* Reject any messages that accumulated in backlog queue */
release_sock(sk);
u32_list_purge(&tsk->cong_links);
tsk->cong_link_cnt = 0;
call_rcu(&tsk->rcu, tipc_sk_callback);
sock->sk = NULL;
......@@ -648,7 +698,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
switch (sk->sk_state) {
case TIPC_ESTABLISHED:
if (!tsk->link_cong && !tsk_conn_cong(tsk))
if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
mask |= POLLOUT;
/* fall thru' */
case TIPC_LISTEN:
......@@ -657,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
mask |= (POLLIN | POLLRDNORM);
break;
case TIPC_OPEN:
if (!tsk->link_cong)
if (!tsk->cong_link_cnt)
mask |= POLLOUT;
if (tipc_sk_type_connectionless(sk) &&
(!skb_queue_empty(&sk->sk_receive_queue)))
......@@ -676,63 +726,48 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
* @sock: socket structure
* @seq: destination address
* @msg: message to send
* @dsz: total length of message data
* @timeo: timeout to wait for wakeup
* @dlen: length of data to send
* @timeout: timeout to wait for wakeup
*
* Called from function tipc_sendmsg(), which has done all sanity checks
* Returns the number of bytes sent on success, or errno
*/
static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
struct msghdr *msg, size_t dsz, long timeo)
struct msghdr *msg, size_t dlen, long timeout)
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
struct tipc_msg *mhdr = &tsk->phdr;
struct sk_buff_head pktchain;
struct iov_iter save = msg->msg_iter;
uint mtu;
int mtu = tipc_bcast_get_mtu(net);
struct sk_buff_head pkts;
int rc;
if (!timeo && tsk->link_cong)
return -ELINKCONG;
msg_set_type(mhdr, TIPC_MCAST_MSG);
msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
msg_set_destport(mhdr, 0);
msg_set_destnode(mhdr, 0);
msg_set_nametype(mhdr, seq->type);
msg_set_namelower(mhdr, seq->lower);
msg_set_nameupper(mhdr, seq->upper);
msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
skb_queue_head_init(&pktchain);
rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
if (unlikely(rc))
return rc;
new_mtu:
mtu = tipc_bcast_get_mtu(net);
rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
if (unlikely(rc < 0))
msg_set_type(hdr, TIPC_MCAST_MSG);
msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
msg_set_destport(hdr, 0);
msg_set_destnode(hdr, 0);
msg_set_nametype(hdr, seq->type);
msg_set_namelower(hdr, seq->lower);
msg_set_nameupper(hdr, seq->upper);
msg_set_hdr_sz(hdr, MCAST_H_SIZE);
skb_queue_head_init(&pkts);
rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
if (unlikely(rc != dlen))
return rc;
do {
rc = tipc_bcast_xmit(net, &pktchain);
if (likely(!rc))
return dsz;
if (rc == -ELINKCONG) {
tsk->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo);
if (!rc)
continue;
}
__skb_queue_purge(&pktchain);
if (rc == -EMSGSIZE) {
msg->msg_iter = save;
goto new_mtu;
}
break;
} while (1);
return rc;
rc = tipc_bcast_xmit(net, &pkts);
if (unlikely(rc == -ELINKCONG)) {
tsk->cong_link_cnt = 1;
rc = 0;
}
return rc ? rc : dlen;
}
/**
......@@ -746,7 +781,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff_head *inputq)
{
struct tipc_msg *msg;
struct tipc_plist dports;
struct list_head dports;
u32 portid;
u32 scope = TIPC_CLUSTER_SCOPE;
struct sk_buff_head tmpq;
......@@ -754,7 +789,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff *skb, *_skb;
__skb_queue_head_init(&tmpq);
tipc_plist_init(&dports);
INIT_LIST_HEAD(&dports);
skb = tipc_skb_peek(arrvq, &inputq->lock);
for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
......@@ -768,8 +803,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
tipc_nametbl_mc_translate(net,
msg_nametype(msg), msg_namelower(msg),
msg_nameupper(msg), scope, &dports);
portid = tipc_plist_pop(&dports);
for (; portid; portid = tipc_plist_pop(&dports)) {
portid = u32_pop(&dports);
for (; portid; portid = u32_pop(&dports)) {
_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
if (_skb) {
msg_set_destport(buf_msg(_skb), portid);
......@@ -830,31 +865,6 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
kfree_skb(skb);
}
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
{
DEFINE_WAIT_FUNC(wait, woken_wake_function);
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
int done;
do {
int err = sock_error(sk);
if (err)
return err;
if (sk->sk_shutdown & SEND_SHUTDOWN)
return -EPIPE;
if (!*timeo_p)
return -EAGAIN;
if (signal_pending(current))
return sock_intr_errno(*timeo_p);
add_wait_queue(sk_sleep(sk), &wait);
done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait);
remove_wait_queue(sk_sleep(sk), &wait);
} while (!done);
return 0;
}
/**
* tipc_sendmsg - send message in connectionless manner
* @sock: socket structure
......@@ -881,35 +891,38 @@ static int tipc_sendmsg(struct socket *sock,
return ret;
}
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
{
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk);
struct tipc_msg *mhdr = &tsk->phdr;
u32 dnode, dport;
struct sk_buff_head pktchain;
bool is_connectionless = tipc_sk_type_connectionless(sk);
struct sk_buff *skb;
struct tipc_sock *tsk = tipc_sk(sk);
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
struct list_head *clinks = &tsk->cong_links;
bool syn = !tipc_sk_type_connectionless(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct tipc_name_seq *seq;
struct iov_iter save;
u32 mtu;
long timeo;
int rc;
struct sk_buff_head pkts;
u32 type, inst, domain;
u32 dnode, dport;
int mtu, rc;
if (dsz > TIPC_MAX_USER_MSG_SIZE)
if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
return -EMSGSIZE;
if (unlikely(!dest)) {
if (is_connectionless && tsk->peer.family == AF_TIPC)
dest = &tsk->peer;
else
dest = &tsk->peer;
if (!syn || dest->family != AF_TIPC)
return -EDESTADDRREQ;
} else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
dest->family != AF_TIPC) {
return -EINVAL;
}
if (!is_connectionless) {
if (unlikely(m->msg_namelen < sizeof(*dest)))
return -EINVAL;
if (unlikely(dest->family != AF_TIPC))
return -EINVAL;
if (unlikely(syn)) {
if (sk->sk_state == TIPC_LISTEN)
return -EPIPE;
if (sk->sk_state != TIPC_OPEN)
......@@ -921,102 +934,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
tsk->conn_instance = dest->addr.name.name.instance;
}
}
seq = &dest->addr.nameseq;
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
if (dest->addrtype == TIPC_ADDR_MCAST) {
return tipc_sendmcast(sock, seq, m, dsz, timeo);
} else if (dest->addrtype == TIPC_ADDR_NAME) {
u32 type = dest->addr.name.name.type;
u32 inst = dest->addr.name.name.instance;
u32 domain = dest->addr.name.domain;
seq = &dest->addr.nameseq;
if (dest->addrtype == TIPC_ADDR_MCAST)
return tipc_sendmcast(sock, seq, m, dlen, timeout);
if (dest->addrtype == TIPC_ADDR_NAME) {
type = dest->addr.name.name.type;
inst = dest->addr.name.name.instance;
domain = dest->addr.name.domain;
dnode = domain;
msg_set_type(mhdr, TIPC_NAMED_MSG);
msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
msg_set_nametype(mhdr, type);
msg_set_nameinst(mhdr, inst);
msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
msg_set_type(hdr, TIPC_NAMED_MSG);
msg_set_hdr_sz(hdr, NAMED_H_SIZE);
msg_set_nametype(hdr, type);
msg_set_nameinst(hdr, inst);
msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
dport = tipc_nametbl_translate(net, type, inst, &dnode);
msg_set_destnode(mhdr, dnode);
msg_set_destport(mhdr, dport);
msg_set_destnode(hdr, dnode);
msg_set_destport(hdr, dport);
if (unlikely(!dport && !dnode))
return -EHOSTUNREACH;
} else if (dest->addrtype == TIPC_ADDR_ID) {
dnode = dest->addr.id.node;
msg_set_type(mhdr, TIPC_DIRECT_MSG);
msg_set_lookup_scope(mhdr, 0);
msg_set_destnode(mhdr, dnode);
msg_set_destport(mhdr, dest->addr.id.ref);
msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
msg_set_type(hdr, TIPC_DIRECT_MSG);
msg_set_lookup_scope(hdr, 0);
msg_set_destnode(hdr, dnode);
msg_set_destport(hdr, dest->addr.id.ref);
msg_set_hdr_sz(hdr, BASIC_H_SIZE);
}
skb_queue_head_init(&pktchain);
save = m->msg_iter;
new_mtu:
mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
if (rc < 0)
/* Block or return if destination link is congested */
rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
if (unlikely(rc))
return rc;
do {
skb = skb_peek(&pktchain);
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
if (likely(!rc)) {
if (!is_connectionless)
tipc_set_sk_state(sk, TIPC_CONNECTING);
return dsz;
}
if (rc == -ELINKCONG) {
tsk->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo);
if (!rc)
continue;
}
__skb_queue_purge(&pktchain);
if (rc == -EMSGSIZE) {
m->msg_iter = save;
goto new_mtu;
}
break;
} while (1);
return rc;
}
skb_queue_head_init(&pkts);
mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
if (unlikely(rc != dlen))
return rc;
static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
{
DEFINE_WAIT_FUNC(wait, woken_wake_function);
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
int done;
rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
if (unlikely(rc == -ELINKCONG)) {
u32_push(clinks, dnode);
tsk->cong_link_cnt++;
rc = 0;
}
do {
int err = sock_error(sk);
if (err)
return err;
if (sk->sk_state == TIPC_DISCONNECTING)
return -EPIPE;
else if (!tipc_sk_connected(sk))
return -ENOTCONN;
if (!*timeo_p)
return -EAGAIN;
if (signal_pending(current))
return sock_intr_errno(*timeo_p);
if (unlikely(syn && !rc))
tipc_set_sk_state(sk, TIPC_CONNECTING);
add_wait_queue(sk_sleep(sk), &wait);
done = sk_wait_event(sk, timeo_p,
(!tsk->link_cong &&
!tsk_conn_cong(tsk)) ||
!tipc_sk_connected(sk), &wait);
remove_wait_queue(sk_sleep(sk), &wait);
} while (!done);
return 0;
return rc ? rc : dlen;
}
/**
* tipc_send_stream - send stream-oriented data
* tipc_sendstream - send stream-oriented data
* @sock: socket structure
* @m: data to send
* @dsz: total length of data to be transmitted
......@@ -1026,94 +999,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
* Returns the number of bytes sent on success (or partial success),
* or errno if no data sent
*/
static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
{
struct sock *sk = sock->sk;
int ret;
lock_sock(sk);
ret = __tipc_send_stream(sock, m, dsz);
ret = __tipc_sendstream(sock, m, dsz);
release_sock(sk);
return ret;
}
static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
{
struct sock *sk = sock->sk;
struct net *net = sock_net(sk);
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *mhdr = &tsk->phdr;
struct sk_buff_head pktchain;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
u32 portid = tsk->portid;
int rc = -EINVAL;
long timeo;
u32 dnode;
uint mtu, send, sent = 0;
struct iov_iter save;
int hlen = MIN_H_SIZE;
/* Handle implied connection establishment */
if (unlikely(dest)) {
rc = __tipc_sendmsg(sock, m, dsz);
hlen = msg_hdr_sz(mhdr);
if (dsz && (dsz == rc))
tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
return rc;
}
if (dsz > (uint)INT_MAX)
return -EMSGSIZE;
if (unlikely(!tipc_sk_connected(sk))) {
if (sk->sk_state == TIPC_DISCONNECTING)
return -EPIPE;
else
return -ENOTCONN;
}
long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
struct sk_buff_head pkts;
u32 dnode = tsk_peer_node(tsk);
int send, sent = 0;
int rc = 0;
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
if (!timeo && tsk->link_cong)
return -ELINKCONG;
skb_queue_head_init(&pkts);
dnode = tsk_peer_node(tsk);
skb_queue_head_init(&pktchain);
if (unlikely(dlen > INT_MAX))
return -EMSGSIZE;
next:
save = m->msg_iter;
mtu = tsk->max_pkt;
send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
if (unlikely(rc < 0))
/* Handle implicit connection setup */
if (unlikely(dest)) {
rc = __tipc_sendmsg(sock, m, dlen);
if (dlen && (dlen == rc))
tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
return rc;
}
do {
if (likely(!tsk_conn_cong(tsk))) {
rc = tipc_node_xmit(net, &pktchain, dnode, portid);
if (likely(!rc)) {
tsk->snt_unacked += tsk_inc(tsk, send + hlen);
sent += send;
if (sent == dsz)
return dsz;
goto next;
}
if (rc == -EMSGSIZE) {
__skb_queue_purge(&pktchain);
tsk->max_pkt = tipc_node_get_mtu(net, dnode,
portid);
m->msg_iter = save;
goto next;
}
if (rc != -ELINKCONG)
break;
rc = tipc_wait_for_cond(sock, &timeout,
(!tsk->cong_link_cnt &&
!tsk_conn_cong(tsk) &&
tipc_sk_connected(sk)));
if (unlikely(rc))
break;
send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
if (unlikely(rc != send))
break;
tsk->link_cong = 1;
rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
if (unlikely(rc == -ELINKCONG)) {
tsk->cong_link_cnt = 1;
rc = 0;
}
if (likely(!rc)) {
tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
sent += send;
}
rc = tipc_wait_for_sndpkt(sock, &timeo);
} while (!rc);
} while (sent < dlen && !rc);
__skb_queue_purge(&pktchain);
return sent ? sent : rc;
return rc ? rc : sent;
}
/**
......@@ -1131,7 +1079,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
if (dsz > TIPC_MAX_USER_MSG_SIZE)
return -EMSGSIZE;
return tipc_send_stream(sock, m, dsz);
return tipc_sendstream(sock, m, dsz);
}
/* tipc_sk_finish_conn - complete the setup of a connection
......@@ -1698,6 +1646,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
unsigned int limit = rcvbuf_limit(sk, skb);
int err = TIPC_OK;
int usr = msg_user(hdr);
u32 onode;
if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
tipc_sk_proto_rcv(tsk, skb, xmitq);
......@@ -1705,8 +1654,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
}
if (unlikely(usr == SOCK_WAKEUP)) {
onode = msg_orignode(hdr);
kfree_skb(skb);
tsk->link_cong = 0;
u32_del(&tsk->cong_links, onode);
tsk->cong_link_cnt--;
sk->sk_write_space(sk);
return false;
}
......@@ -2114,7 +2065,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
struct msghdr m = {NULL,};
tsk_advance_rx_queue(sk);
__tipc_send_stream(new_sock, &m, 0);
__tipc_sendstream(new_sock, &m, 0);
} else {
__skb_dequeue(&sk->sk_receive_queue);
__skb_queue_head(&new_sk->sk_receive_queue, buf);
......@@ -2575,7 +2526,7 @@ static const struct proto_ops stream_ops = {
.shutdown = tipc_shutdown,
.setsockopt = tipc_setsockopt,
.getsockopt = tipc_getsockopt,
.sendmsg = tipc_send_stream,
.sendmsg = tipc_sendstream,
.recvmsg = tipc_recv_stream,
.mmap = sock_no_mmap,
.sendpage = sock_no_sendpage
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment