Commit 60120526 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: simplify connection congestion handling

As a consequence of the recently introduced serialized access
to the socket in commit 8d94168a761819d10252bab1f8de6d7b202c3baa
("tipc: same receive code path for connection protocol and data
messages") we can make a number of simplifications in the
detection and handling of connection congestion situations.

- We don't need to keep two counters, one for sent messages and one
  for acked messages. There is no longer any risk for races between
  acknowledge messages arriving in BH and data message sending
  running in user context. So we merge this into one counter,
  'sent_unacked', which is incremented at sending and subtracted
  from at acknowledge reception.

- We don't need to set the 'congested' field in tipc_port to
  true before we sent the message, and clear it when sending
  is successful. (As a matter of fact, it was never necessary;
  the field was set in link_schedule_port() before any wakeup
  could arrive anyway.)

- We keep the conditions for link congestion and connection connection
  congestion separated. There would otherwise be a risk that an arriving
  acknowledge message may wake up a user sleeping because of link
  congestion.

- We can simplify reception of acknowledge messages.

We also make some cosmetic/structural changes:

- We rename the 'congested' field to the more correct 'link_cong´.

- We rename 'conn_unacked' to 'rcv_unacked'

- We move the above mentioned fields from struct tipc_port to
  struct tipc_sock.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Reviewed-by: default avatarErik Hugne <erik.hugne@ericsson.com>
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent ac0074ee
...@@ -332,13 +332,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) ...@@ -332,13 +332,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
{ {
struct tipc_port *p_ptr; struct tipc_port *p_ptr;
struct tipc_sock *tsk;
spin_lock_bh(&tipc_port_list_lock); spin_lock_bh(&tipc_port_list_lock);
p_ptr = tipc_port_lock(origport); p_ptr = tipc_port_lock(origport);
if (p_ptr) { if (p_ptr) {
if (!list_empty(&p_ptr->wait_list)) if (!list_empty(&p_ptr->wait_list))
goto exit; goto exit;
p_ptr->congested = 1; tsk = tipc_port_to_sock(p_ptr);
tsk->link_cong = 1;
p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
l_ptr->stats.link_congs++; l_ptr->stats.link_congs++;
...@@ -352,6 +354,7 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) ...@@ -352,6 +354,7 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
{ {
struct tipc_port *p_ptr; struct tipc_port *p_ptr;
struct tipc_sock *tsk;
struct tipc_port *temp_p_ptr; struct tipc_port *temp_p_ptr;
int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
...@@ -367,10 +370,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) ...@@ -367,10 +370,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
wait_list) { wait_list) {
if (win <= 0) if (win <= 0)
break; break;
tsk = tipc_port_to_sock(p_ptr);
list_del_init(&p_ptr->wait_list); list_del_init(&p_ptr->wait_list);
spin_lock_bh(p_ptr->lock); spin_lock_bh(p_ptr->lock);
p_ptr->congested = 0; tsk->link_cong = 0;
tipc_port_wakeup(p_ptr); tipc_sock_wakeup(tsk);
win -= p_ptr->waiting_pkts; win -= p_ptr->waiting_pkts;
spin_unlock_bh(p_ptr->lock); spin_unlock_bh(p_ptr->lock);
} }
......
...@@ -186,12 +186,6 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) ...@@ -186,12 +186,6 @@ void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
tipc_port_list_free(dp); tipc_port_list_free(dp);
} }
void tipc_port_wakeup(struct tipc_port *port)
{
tipc_sock_wakeup(tipc_port_to_sock(port));
}
/* tipc_port_init - intiate TIPC port and lock it /* tipc_port_init - intiate TIPC port and lock it
* *
* Returns obtained reference if initialization is successful, zero otherwise * Returns obtained reference if initialization is successful, zero otherwise
...@@ -209,7 +203,6 @@ u32 tipc_port_init(struct tipc_port *p_ptr, ...@@ -209,7 +203,6 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
} }
p_ptr->max_pkt = MAX_PKT_DEFAULT; p_ptr->max_pkt = MAX_PKT_DEFAULT;
p_ptr->sent = 1;
p_ptr->ref = ref; p_ptr->ref = ref;
INIT_LIST_HEAD(&p_ptr->wait_list); INIT_LIST_HEAD(&p_ptr->wait_list);
INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
...@@ -459,10 +452,9 @@ void tipc_acknowledge(u32 ref, u32 ack) ...@@ -459,10 +452,9 @@ void tipc_acknowledge(u32 ref, u32 ack)
p_ptr = tipc_port_lock(ref); p_ptr = tipc_port_lock(ref);
if (!p_ptr) if (!p_ptr)
return; return;
if (p_ptr->connected) { if (p_ptr->connected)
p_ptr->conn_unacked -= ack;
buf = port_build_proto_msg(p_ptr, CONN_ACK, ack); buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
}
tipc_port_unlock(p_ptr); tipc_port_unlock(p_ptr);
if (!buf) if (!buf)
return; return;
......
...@@ -53,17 +53,13 @@ ...@@ -53,17 +53,13 @@
* @connected: non-zero if port is currently connected to a peer port * @connected: non-zero if port is currently connected to a peer port
* @conn_type: TIPC type used when connection was established * @conn_type: TIPC type used when connection was established
* @conn_instance: TIPC instance used when connection was established * @conn_instance: TIPC instance used when connection was established
* @conn_unacked: number of unacknowledged messages received from peer port
* @published: non-zero if port has one or more associated names * @published: non-zero if port has one or more associated names
* @congested: non-zero if cannot send because of link or port congestion
* @max_pkt: maximum packet size "hint" used when building messages sent by port * @max_pkt: maximum packet size "hint" used when building messages sent by port
* @ref: unique reference to port in TIPC object registry * @ref: unique reference to port in TIPC object registry
* @phdr: preformatted message header used when sending messages * @phdr: preformatted message header used when sending messages
* @port_list: adjacent ports in TIPC's global list of ports * @port_list: adjacent ports in TIPC's global list of ports
* @wait_list: adjacent ports in list of ports waiting on link congestion * @wait_list: adjacent ports in list of ports waiting on link congestion
* @waiting_pkts: * @waiting_pkts:
* @sent: # of non-empty messages sent by port
* @acked: # of non-empty message acknowledgements from connected port's peer
* @publications: list of publications for port * @publications: list of publications for port
* @pub_count: total # of publications port has made during its lifetime * @pub_count: total # of publications port has made during its lifetime
* @probing_state: * @probing_state:
...@@ -76,17 +72,13 @@ struct tipc_port { ...@@ -76,17 +72,13 @@ struct tipc_port {
int connected; int connected;
u32 conn_type; u32 conn_type;
u32 conn_instance; u32 conn_instance;
u32 conn_unacked;
int published; int published;
u32 congested;
u32 max_pkt; u32 max_pkt;
u32 ref; u32 ref;
struct tipc_msg phdr; struct tipc_msg phdr;
struct list_head port_list; struct list_head port_list;
struct list_head wait_list; struct list_head wait_list;
u32 waiting_pkts; u32 waiting_pkts;
u32 sent;
u32 acked;
struct list_head publications; struct list_head publications;
u32 pub_count; u32 pub_count;
u32 probing_state; u32 probing_state;
...@@ -120,8 +112,6 @@ int tipc_port_disconnect(u32 portref); ...@@ -120,8 +112,6 @@ int tipc_port_disconnect(u32 portref);
int tipc_port_shutdown(u32 ref); int tipc_port_shutdown(u32 ref);
void tipc_port_wakeup(struct tipc_port *port);
/* /*
* The following routines require that the port be locked on entry * The following routines require that the port be locked on entry
*/ */
...@@ -161,12 +151,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr) ...@@ -161,12 +151,6 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr)
spin_unlock_bh(p_ptr->lock); spin_unlock_bh(p_ptr->lock);
} }
static inline int tipc_port_congested(struct tipc_port *p_ptr)
{
return ((p_ptr->sent - p_ptr->acked) >= TIPC_FLOWCTRL_WIN);
}
static inline u32 tipc_port_peernode(struct tipc_port *p_ptr) static inline u32 tipc_port_peernode(struct tipc_port *p_ptr)
{ {
return msg_destnode(&p_ptr->phdr); return msg_destnode(&p_ptr->phdr);
......
...@@ -207,7 +207,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, ...@@ -207,7 +207,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sk->sk_data_ready = tipc_data_ready; sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space; sk->sk_write_space = tipc_write_space;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
tsk->port.sent = 0; tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0); atomic_set(&tsk->dupl_rcvcnt, 0);
tipc_port_unlock(port); tipc_port_unlock(port);
...@@ -513,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, ...@@ -513,12 +513,12 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
switch ((int)sock->state) { switch ((int)sock->state) {
case SS_UNCONNECTED: case SS_UNCONNECTED:
if (!tsk->port.congested) if (!tsk->link_cong)
mask |= POLLOUT; mask |= POLLOUT;
break; break;
case SS_READY: case SS_READY:
case SS_CONNECTED: case SS_CONNECTED:
if (!tsk->port.congested) if (!tsk->link_cong && !tipc_sk_conn_cong(tsk))
mask |= POLLOUT; mask |= POLLOUT;
/* fall thru' */ /* fall thru' */
case SS_CONNECTING: case SS_CONNECTING:
...@@ -546,7 +546,7 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf) ...@@ -546,7 +546,7 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
{ {
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(buf);
struct tipc_port *port = &tsk->port; struct tipc_port *port = &tsk->port;
int wakeable; int conn_cong;
/* Ignore if connection cannot be validated: */ /* Ignore if connection cannot be validated: */
if (!port->connected || !tipc_port_peer_msg(port, msg)) if (!port->connected || !tipc_port_peer_msg(port, msg))
...@@ -555,13 +555,10 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf) ...@@ -555,13 +555,10 @@ int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, struct sk_buff *buf)
port->probing_state = TIPC_CONN_OK; port->probing_state = TIPC_CONN_OK;
if (msg_type(msg) == CONN_ACK) { if (msg_type(msg) == CONN_ACK) {
wakeable = tipc_port_congested(port) && port->congested; conn_cong = tipc_sk_conn_cong(tsk);
port->acked += msg_msgcnt(msg); tsk->sent_unacked -= msg_msgcnt(msg);
if (!tipc_port_congested(port)) { if (conn_cong)
port->congested = 0; tipc_sock_wakeup(tsk);
if (wakeable)
tipc_port_wakeup(port);
}
} else if (msg_type(msg) == CONN_PROBE) { } else if (msg_type(msg) == CONN_PROBE) {
if (!tipc_msg_reverse(buf, dnode, TIPC_OK)) if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
return TIPC_OK; return TIPC_OK;
...@@ -626,7 +623,7 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) ...@@ -626,7 +623,7 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
return sock_intr_errno(*timeo_p); return sock_intr_errno(*timeo_p);
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p, !tsk->port.congested); done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
finish_wait(sk_sleep(sk), &wait); finish_wait(sk_sleep(sk), &wait);
} while (!done); } while (!done);
return 0; return 0;
...@@ -800,7 +797,6 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) ...@@ -800,7 +797,6 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_port *port = &tsk->port;
DEFINE_WAIT(wait); DEFINE_WAIT(wait);
int done; int done;
...@@ -819,7 +815,9 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) ...@@ -819,7 +815,9 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p, done = sk_wait_event(sk, timeo_p,
(!port->congested || !port->connected)); (!tsk->link_cong &&
!tipc_sk_conn_cong(tsk)) ||
!tsk->port.connected);
finish_wait(sk_sleep(sk), &wait); finish_wait(sk_sleep(sk), &wait);
} while (!done); } while (!done);
return 0; return 0;
...@@ -856,7 +854,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, ...@@ -856,7 +854,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
if (unlikely(dest)) { if (unlikely(dest)) {
rc = tipc_sendmsg(iocb, sock, m, dsz); rc = tipc_sendmsg(iocb, sock, m, dsz);
if (dsz && (dsz == rc)) if (dsz && (dsz == rc))
tsk->port.sent = 1; tsk->sent_unacked = 1;
return rc; return rc;
} }
if (dsz > (uint)INT_MAX) if (dsz > (uint)INT_MAX)
...@@ -875,7 +873,6 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, ...@@ -875,7 +873,6 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
dnode = tipc_port_peernode(port); dnode = tipc_port_peernode(port);
port->congested = 1;
next: next:
mtu = port->max_pkt; mtu = port->max_pkt;
...@@ -884,11 +881,10 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, ...@@ -884,11 +881,10 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
if (unlikely(rc < 0)) if (unlikely(rc < 0))
goto exit; goto exit;
do { do {
port->congested = 1; if (likely(!tipc_sk_conn_cong(tsk))) {
if (likely(!tipc_port_congested(port))) {
rc = tipc_link_xmit2(buf, dnode, ref); rc = tipc_link_xmit2(buf, dnode, ref);
if (likely(!rc)) { if (likely(!rc)) {
port->sent++; tsk->sent_unacked++;
sent += send; sent += send;
if (sent == dsz) if (sent == dsz)
break; break;
...@@ -903,8 +899,6 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, ...@@ -903,8 +899,6 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
} }
rc = tipc_wait_for_sndpkt(sock, &timeo); rc = tipc_wait_for_sndpkt(sock, &timeo);
} while (!rc); } while (!rc);
port->congested = 0;
exit: exit:
if (iocb) if (iocb)
release_sock(sk); release_sock(sk);
...@@ -1169,8 +1163,10 @@ static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock, ...@@ -1169,8 +1163,10 @@ static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
/* Consume received message (optional) */ /* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) { if (likely(!(flags & MSG_PEEK))) {
if ((sock->state != SS_READY) && if ((sock->state != SS_READY) &&
(++port->conn_unacked >= TIPC_CONNACK_INTV)) (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
tipc_acknowledge(port->ref, port->conn_unacked); tipc_acknowledge(port->ref, tsk->rcv_unacked);
tsk->rcv_unacked = 0;
}
advance_rx_queue(sk); advance_rx_queue(sk);
} }
exit: exit:
...@@ -1278,8 +1274,10 @@ static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock, ...@@ -1278,8 +1274,10 @@ static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
/* Consume received message (optional) */ /* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) { if (likely(!(flags & MSG_PEEK))) {
if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV)) if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
tipc_acknowledge(port->ref, port->conn_unacked); tipc_acknowledge(port->ref, tsk->rcv_unacked);
tsk->rcv_unacked = 0;
}
advance_rx_queue(sk); advance_rx_queue(sk);
} }
......
...@@ -48,6 +48,9 @@ ...@@ -48,6 +48,9 @@
* @peer_name: the peer of the connection, if any * @peer_name: the peer of the connection, if any
* @conn_timeout: the time we can wait for an unresponded setup request * @conn_timeout: the time we can wait for an unresponded setup request
* @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
* @link_cong: non-zero if owner must sleep because of link congestion
* @sent_unacked: # messages sent by socket, and not yet acked by peer
* @rcv_unacked: # messages read by user, but not yet acked back to peer
*/ */
struct tipc_sock { struct tipc_sock {
...@@ -55,6 +58,9 @@ struct tipc_sock { ...@@ -55,6 +58,9 @@ struct tipc_sock {
struct tipc_port port; struct tipc_port port;
unsigned int conn_timeout; unsigned int conn_timeout;
atomic_t dupl_rcvcnt; atomic_t dupl_rcvcnt;
int link_cong;
uint sent_unacked;
uint rcv_unacked;
}; };
static inline struct tipc_sock *tipc_sk(const struct sock *sk) static inline struct tipc_sock *tipc_sk(const struct sock *sk)
...@@ -72,6 +78,11 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk) ...@@ -72,6 +78,11 @@ static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
tsk->sk.sk_write_space(&tsk->sk); tsk->sk.sk_write_space(&tsk->sk);
} }
static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
{
return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
}
int tipc_sk_rcv(struct sk_buff *buf); int tipc_sk_rcv(struct sk_buff *buf);
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment