Commit f9c935db authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: fix problems with multipoint-to-point flow control

In commit 04d7b574 ("tipc: add multipoint-to-point flow control") we
introduced a protocol for preventing buffer overflow when many group
members try to simultaneously send messages to the same receiving member.

Stress test of this mechanism has revealed a couple of related bugs:

- When the receiving member receives an advertisement REMIT message from
  one of the senders, it will sometimes prematurely activate a pending
  member and send it the remitted advertisement, although the upper
  limit for active senders has been reached. This leads to accumulation
  of illegal advertisements, and eventually to messages being dropped
  because of receive buffer overflow.

- When the receiving member leaves REMITTED state while a received
  message is being read, we miss to look at the pending queue, to
  activate the oldest pending peer. This leads to some pending senders
  being starved out, and never getting the opportunity to profit from
  the remitted advertisement.

We fix the former in the function tipc_group_proto_rcv() by returning
directly from the function once it becomes clear that the remitting
peer cannot leave REMITTED state at that point.

We fix the latter in the function tipc_group_update_rcv_win() by looking
up and activate the longest pending peer when it becomes clear that the
remitting peer now can leave REMITTED state.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 71891e2d
...@@ -109,7 +109,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, ...@@ -109,7 +109,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
static void tipc_group_decr_active(struct tipc_group *grp, static void tipc_group_decr_active(struct tipc_group *grp,
struct tipc_member *m) struct tipc_member *m)
{ {
if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING) if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING ||
m->state == MBR_REMITTED)
grp->active_cnt--; grp->active_cnt--;
} }
...@@ -562,7 +563,7 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, ...@@ -562,7 +563,7 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
int max_active = grp->max_active; int max_active = grp->max_active;
int reclaim_limit = max_active * 3 / 4; int reclaim_limit = max_active * 3 / 4;
int active_cnt = grp->active_cnt; int active_cnt = grp->active_cnt;
struct tipc_member *m, *rm; struct tipc_member *m, *rm, *pm;
m = tipc_group_find_member(grp, node, port); m = tipc_group_find_member(grp, node, port);
if (!m) if (!m)
...@@ -605,6 +606,17 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, ...@@ -605,6 +606,17 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
} }
grp->active_cnt--;
list_del_init(&m->list);
if (list_empty(&grp->pending))
return;
/* Set oldest pending member to active and advertise */
pm = list_first_entry(&grp->pending, struct tipc_member, list);
pm->state = MBR_ACTIVE;
list_move_tail(&pm->list, &grp->active);
grp->active_cnt++;
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
break; break;
case MBR_RECLAIMING: case MBR_RECLAIMING:
case MBR_DISCOVERED: case MBR_DISCOVERED:
...@@ -742,14 +754,14 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -742,14 +754,14 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
if (!m || m->state != MBR_RECLAIMING) if (!m || m->state != MBR_RECLAIMING)
return; return;
list_del_init(&m->list);
grp->active_cnt--;
remitted = msg_grp_remitted(hdr); remitted = msg_grp_remitted(hdr);
/* Messages preceding the REMIT still in receive queue */ /* Messages preceding the REMIT still in receive queue */
if (m->advertised > remitted) { if (m->advertised > remitted) {
m->state = MBR_REMITTED; m->state = MBR_REMITTED;
in_flight = m->advertised - remitted; in_flight = m->advertised - remitted;
m->advertised = ADV_IDLE + in_flight;
return;
} }
/* All messages preceding the REMIT have been read */ /* All messages preceding the REMIT have been read */
if (m->advertised <= remitted) { if (m->advertised <= remitted) {
...@@ -761,6 +773,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -761,6 +773,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
m->advertised = ADV_IDLE + in_flight; m->advertised = ADV_IDLE + in_flight;
grp->active_cnt--;
list_del_init(&m->list);
/* Set oldest pending member to active and advertise */ /* Set oldest pending member to active and advertise */
if (list_empty(&grp->pending)) if (list_empty(&grp->pending))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment