Commit a3bada70 authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: guarantee delivery of last broadcast before DOWN event

The following scenario is possible:
- A user sends a broadcast message, and thereafter immediately leaves
  the group.
- The LEAVE message, following a different path than the broadcast,
  arrives ahead of the broadcast, and the sending member is removed
  from the receiver's list.
- The broadcast message arrives, but is dropped because the sender
  now is unknown to the receipient.

We fix this by sequence numbering membership events, just like ordinary
unicast messages. Currently, when a JOIN is sent to a peer, it contains
a synchronization point, - the sequence number of the next sent
broadcast, in order to give the receiver a start synchronization point.
We now let even LEAVE messages contain such an "end synchronization"
point, so that the recipient can delay the removal of the sending member
until it knows that all messages have been received.

The received synchronization points are added as sequence numbers to the
generated membership events, making it possible to handle them almost
the same way as regular unicasts in the receiving filter function. In
particular, a DOWN event with a too high sequence number will be kept
in the reordering queue until the missing broadcast(s) arrive and have
been delivered.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 399574d4
...@@ -71,6 +71,7 @@ struct tipc_member { ...@@ -71,6 +71,7 @@ struct tipc_member {
u16 advertised; u16 advertised;
u16 window; u16 window;
u16 bc_rcv_nxt; u16 bc_rcv_nxt;
u16 bc_syncpt;
u16 bc_acked; u16 bc_acked;
bool usr_pending; bool usr_pending;
}; };
...@@ -410,7 +411,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) ...@@ -410,7 +411,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
struct sk_buff *_skb, *tmp; struct sk_buff *_skb, *tmp;
int mtyp = msg_type(hdr); int mtyp = msg_type(hdr);
/* Bcast may be bypassed by unicast or other bcast, - sort it in */ /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */
if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
skb_queue_walk_safe(defq, _skb, tmp) { skb_queue_walk_safe(defq, _skb, tmp) {
_hdr = buf_msg(_skb); _hdr = buf_msg(_skb);
...@@ -431,7 +432,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, ...@@ -431,7 +432,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
struct sk_buff_head *xmitq) struct sk_buff_head *xmitq)
{ {
struct sk_buff *skb = __skb_dequeue(inputq); struct sk_buff *skb = __skb_dequeue(inputq);
bool ack, deliver, update; bool ack, deliver, update, leave = false;
struct sk_buff_head *defq; struct sk_buff_head *defq;
struct tipc_member *m; struct tipc_member *m;
struct tipc_msg *hdr; struct tipc_msg *hdr;
...@@ -448,13 +449,6 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, ...@@ -448,13 +449,6 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
if (!msg_in_group(hdr)) if (!msg_in_group(hdr))
goto drop; goto drop;
if (msg_is_grp_evt(hdr)) {
if (!grp->events)
goto drop;
__skb_queue_tail(inputq, skb);
return;
}
m = tipc_group_find_member(grp, node, port); m = tipc_group_find_member(grp, node, port);
if (!tipc_group_is_receiver(m)) if (!tipc_group_is_receiver(m))
goto drop; goto drop;
...@@ -490,6 +484,12 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, ...@@ -490,6 +484,12 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
break; break;
case TIPC_GRP_UCAST_MSG: case TIPC_GRP_UCAST_MSG:
break; break;
case TIPC_GRP_MEMBER_EVT:
if (m->state == MBR_LEAVING)
leave = true;
if (!grp->events)
deliver = false;
break;
default: default:
break; break;
} }
...@@ -504,6 +504,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, ...@@ -504,6 +504,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
if (ack) if (ack)
tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
if (leave) {
tipc_group_delete_member(grp, m);
__skb_queue_purge(defq);
break;
}
if (!update) if (!update)
continue; continue;
...@@ -561,6 +566,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, ...@@ -561,6 +566,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
msg_set_adv_win(hdr, adv); msg_set_adv_win(hdr, adv);
m->advertised += adv; m->advertised += adv;
} else if (mtyp == GRP_LEAVE_MSG) {
msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
} else if (mtyp == GRP_ADV_MSG) { } else if (mtyp == GRP_ADV_MSG) {
msg_set_adv_win(hdr, adv); msg_set_adv_win(hdr, adv);
m->advertised += adv; m->advertised += adv;
...@@ -577,6 +584,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -577,6 +584,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
u32 node = msg_orignode(hdr); u32 node = msg_orignode(hdr);
u32 port = msg_origport(hdr); u32 port = msg_origport(hdr);
struct tipc_member *m; struct tipc_member *m;
struct tipc_msg *ehdr;
if (!grp) if (!grp)
return; return;
...@@ -590,7 +598,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -590,7 +598,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
MBR_QUARANTINED); MBR_QUARANTINED);
if (!m) if (!m)
return; return;
m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); m->bc_syncpt = msg_grp_bc_syncpt(hdr);
m->bc_rcv_nxt = m->bc_syncpt;
m->window += msg_adv_win(hdr); m->window += msg_adv_win(hdr);
/* Wait until PUBLISH event is received */ /* Wait until PUBLISH event is received */
...@@ -601,6 +610,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -601,6 +610,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
*usr_wakeup = true; *usr_wakeup = true;
m->usr_pending = false; m->usr_pending = false;
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
ehdr = buf_msg(m->event_msg);
msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
__skb_queue_tail(inputq, m->event_msg); __skb_queue_tail(inputq, m->event_msg);
} }
if (m->window < ADV_IDLE) if (m->window < ADV_IDLE)
...@@ -611,6 +622,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -611,6 +622,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
case GRP_LEAVE_MSG: case GRP_LEAVE_MSG:
if (!m) if (!m)
return; return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr);
/* Wait until WITHDRAW event is received */ /* Wait until WITHDRAW event is received */
if (m->state != MBR_LEAVING) { if (m->state != MBR_LEAVING) {
...@@ -618,9 +630,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -618,9 +630,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
return; return;
} }
/* Otherwise deliver already received WITHDRAW event */ /* Otherwise deliver already received WITHDRAW event */
ehdr = buf_msg(m->event_msg);
msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
__skb_queue_tail(inputq, m->event_msg); __skb_queue_tail(inputq, m->event_msg);
*usr_wakeup = true; *usr_wakeup = true;
tipc_group_delete_member(grp, m);
list_del_init(&m->congested); list_del_init(&m->congested);
return; return;
case GRP_ADV_MSG: case GRP_ADV_MSG:
...@@ -662,6 +675,7 @@ void tipc_group_member_evt(struct tipc_group *grp, ...@@ -662,6 +675,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
int event = evt->event; int event = evt->event;
struct tipc_member *m; struct tipc_member *m;
struct net *net; struct net *net;
bool node_up;
u32 self; u32 self;
if (!grp) if (!grp)
...@@ -695,6 +709,7 @@ void tipc_group_member_evt(struct tipc_group *grp, ...@@ -695,6 +709,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
m->event_msg = skb; m->event_msg = skb;
m->state = MBR_PUBLISHED; m->state = MBR_PUBLISHED;
} else { } else {
msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
__skb_queue_tail(inputq, skb); __skb_queue_tail(inputq, skb);
m->state = MBR_JOINED; m->state = MBR_JOINED;
*usr_wakeup = true; *usr_wakeup = true;
...@@ -715,14 +730,18 @@ void tipc_group_member_evt(struct tipc_group *grp, ...@@ -715,14 +730,18 @@ void tipc_group_member_evt(struct tipc_group *grp,
*usr_wakeup = true; *usr_wakeup = true;
m->usr_pending = false; m->usr_pending = false;
node_up = tipc_node_is_up(net, node);
/* Hold back event if more messages might be expected */ /* Hold back event if more messages might be expected */
if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) { if (m->state != MBR_LEAVING && node_up) {
m->event_msg = skb; m->event_msg = skb;
m->state = MBR_LEAVING; m->state = MBR_LEAVING;
} else { } else {
if (node_up)
msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
else
msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
__skb_queue_tail(inputq, skb); __skb_queue_tail(inputq, skb);
tipc_group_delete_member(grp, m);
} }
list_del_init(&m->congested); list_del_init(&m->congested);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment