Commit 2320bcda authored by Tuong Lien's avatar Tuong Lien Committed by David S. Miller

tipc: fix changeover issues due to large packet

In conjunction with changing the interfaces' MTU (e.g. especially in
the case of a bonding) where the TIPC links are brought up and down
in a short time, a couple of issues were detected with the current link
changeover mechanism:

1) When one link is up but immediately forced down again, the failover
procedure will be carried out in order to failover all the messages in
the link's transmq queue onto the other working link. The link and node
state is also set to FAILINGOVER as part of the process. The message
will be transmited in form of a FAILOVER_MSG, so its size is plus of 40
bytes (= the message header size). There is no problem if the original
message size is not larger than the link's MTU - 40, and indeed this is
the max size of a normal payload messages. However, in the situation
above, because the link has just been up, the messages in the link's
transmq are almost SYNCH_MSGs which had been generated by the link
synching procedure, then their size might reach the max value already!
When the FAILOVER_MSG is built on the top of such a SYNCH_MSG, its size
will exceed the link's MTU. As a result, the messages are dropped
silently and the failover procedure will never end up, the link will
not be able to exit the FAILINGOVER state, so cannot be re-established.

2) The same scenario above can happen more easily in case the MTU of
the links is set differently or when changing. In that case, as long as
a large message in the failure link's transmq queue was built and
fragmented with its link's MTU > the other link's one, the issue will
happen (there is no need of a link synching in advance).

3) The link synching procedure also faces with the same issue but since
the link synching is only started upon receipt of a SYNCH_MSG, dropping
the message will not result in a state deadlock, but it is not expected
as design.

The 1) & 3) issues are resolved by the last commit that only a dummy
SYNCH_MSG (i.e. without data) is generated at the link synching, so the
size of a FAILOVER_MSG if any then will never exceed the link's MTU.

For the 2) issue, the only solution is trying to fragment the messages
in the failure link's transmq queue according to the working link's MTU
so they can be failovered then. A new function is made to accomplish
this, it will still be a TUNNEL PROTOCOL/FAILOVER MSG but if the
original message size is too large, it will be fragmented & reassembled
at the receiving side.
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Acked-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarTuong Lien <tuong.t.lien@dektech.com.au>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 4929a932
...@@ -180,6 +180,7 @@ struct tipc_link { ...@@ -180,6 +180,7 @@ struct tipc_link {
/* Fragmentation/reassembly */ /* Fragmentation/reassembly */
struct sk_buff *reasm_buf; struct sk_buff *reasm_buf;
struct sk_buff *reasm_tnlmsg;
/* Broadcast */ /* Broadcast */
u16 ackers; u16 ackers;
...@@ -897,8 +898,10 @@ void tipc_link_reset(struct tipc_link *l) ...@@ -897,8 +898,10 @@ void tipc_link_reset(struct tipc_link *l)
l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
kfree_skb(l->reasm_buf); kfree_skb(l->reasm_buf);
kfree_skb(l->reasm_tnlmsg);
kfree_skb(l->failover_reasm_skb); kfree_skb(l->failover_reasm_skb);
l->reasm_buf = NULL; l->reasm_buf = NULL;
l->reasm_tnlmsg = NULL;
l->failover_reasm_skb = NULL; l->failover_reasm_skb = NULL;
l->rcv_unacked = 0; l->rcv_unacked = 0;
l->snd_nxt = 1; l->snd_nxt = 1;
...@@ -940,6 +943,9 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, ...@@ -940,6 +943,9 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
int rc = 0; int rc = 0;
if (unlikely(msg_size(hdr) > mtu)) { if (unlikely(msg_size(hdr) > mtu)) {
pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n",
skb_queue_len(list), msg_user(hdr),
msg_type(hdr), msg_size(hdr), mtu);
skb_queue_purge(list); skb_queue_purge(list);
return -EMSGSIZE; return -EMSGSIZE;
} }
...@@ -1233,6 +1239,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, ...@@ -1233,6 +1239,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq) struct sk_buff_head *inputq)
{ {
struct sk_buff **reasm_skb = &l->failover_reasm_skb; struct sk_buff **reasm_skb = &l->failover_reasm_skb;
struct sk_buff **reasm_tnlmsg = &l->reasm_tnlmsg;
struct sk_buff_head *fdefq = &l->failover_deferdq; struct sk_buff_head *fdefq = &l->failover_deferdq;
struct tipc_msg *hdr = buf_msg(skb); struct tipc_msg *hdr = buf_msg(skb);
struct sk_buff *iskb; struct sk_buff *iskb;
...@@ -1240,40 +1247,56 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, ...@@ -1240,40 +1247,56 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
int rc = 0; int rc = 0;
u16 seqno; u16 seqno;
/* SYNCH_MSG */ if (msg_type(hdr) == SYNCH_MSG) {
if (msg_type(hdr) == SYNCH_MSG) kfree_skb(skb);
goto drop; return 0;
}
/* FAILOVER_MSG */ /* Not a fragment? */
if (!tipc_msg_extract(skb, &iskb, &ipos)) { if (likely(!msg_nof_fragms(hdr))) {
pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n", if (unlikely(!tipc_msg_extract(skb, &iskb, &ipos))) {
skb_queue_len(fdefq)); pr_warn_ratelimited("Unable to extract msg, defq: %d\n",
return rc; skb_queue_len(fdefq));
return 0;
}
kfree_skb(skb);
} else {
/* Set fragment type for buf_append */
if (msg_fragm_no(hdr) == 1)
msg_set_type(hdr, FIRST_FRAGMENT);
else if (msg_fragm_no(hdr) < msg_nof_fragms(hdr))
msg_set_type(hdr, FRAGMENT);
else
msg_set_type(hdr, LAST_FRAGMENT);
if (!tipc_buf_append(reasm_tnlmsg, &skb)) {
/* Successful but non-complete reassembly? */
if (*reasm_tnlmsg || link_is_bc_rcvlink(l))
return 0;
pr_warn_ratelimited("Unable to reassemble tunnel msg\n");
return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
}
iskb = skb;
} }
do { do {
seqno = buf_seqno(iskb); seqno = buf_seqno(iskb);
if (unlikely(less(seqno, l->drop_point))) { if (unlikely(less(seqno, l->drop_point))) {
kfree_skb(iskb); kfree_skb(iskb);
continue; continue;
} }
if (unlikely(seqno != l->drop_point)) { if (unlikely(seqno != l->drop_point)) {
__tipc_skb_queue_sorted(fdefq, seqno, iskb); __tipc_skb_queue_sorted(fdefq, seqno, iskb);
continue; continue;
} }
l->drop_point++; l->drop_point++;
if (!tipc_data_input(l, iskb, inputq)) if (!tipc_data_input(l, iskb, inputq))
rc |= tipc_link_input(l, iskb, inputq, reasm_skb); rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
if (unlikely(rc)) if (unlikely(rc))
break; break;
} while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point))); } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
drop:
kfree_skb(skb);
return rc; return rc;
} }
...@@ -1663,15 +1686,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, ...@@ -1663,15 +1686,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
struct sk_buff *skb, *tnlskb; struct sk_buff *skb, *tnlskb;
struct tipc_msg *hdr, tnlhdr; struct tipc_msg *hdr, tnlhdr;
struct sk_buff_head *queue = &l->transmq; struct sk_buff_head *queue = &l->transmq;
struct sk_buff_head tmpxq, tnlq; struct sk_buff_head tmpxq, tnlq, frags;
u16 pktlen, pktcnt, seqno = l->snd_nxt; u16 pktlen, pktcnt, seqno = l->snd_nxt;
bool pktcnt_need_update = false;
u16 syncpt; u16 syncpt;
int rc;
if (!tnl) if (!tnl)
return; return;
skb_queue_head_init(&tnlq); skb_queue_head_init(&tnlq);
skb_queue_head_init(&tmpxq); skb_queue_head_init(&tmpxq);
skb_queue_head_init(&frags);
/* At least one packet required for safe algorithm => add dummy */ /* At least one packet required for safe algorithm => add dummy */
skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
...@@ -1727,6 +1753,39 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, ...@@ -1727,6 +1753,39 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
if (queue == &l->backlogq) if (queue == &l->backlogq)
msg_set_seqno(hdr, seqno++); msg_set_seqno(hdr, seqno++);
pktlen = msg_size(hdr); pktlen = msg_size(hdr);
/* Tunnel link MTU is not large enough? This could be
* due to:
* 1) Link MTU has just changed or set differently;
* 2) Or FAILOVER on the top of a SYNCH message
*
* The 2nd case should not happen if peer supports
* TIPC_TUNNEL_ENHANCED
*/
if (pktlen > tnl->mtu - INT_H_SIZE) {
if (mtyp == FAILOVER_MSG &&
(tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) {
rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu,
&frags);
if (rc) {
pr_warn("%sunable to frag msg: rc %d\n",
link_co_err, rc);
return;
}
pktcnt += skb_queue_len(&frags) - 1;
pktcnt_need_update = true;
skb_queue_splice_tail_init(&frags, &tnlq);
continue;
}
/* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED
* => Just warn it and return!
*/
pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n",
link_co_err, msg_user(hdr),
msg_type(hdr), msg_size(hdr));
return;
}
msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); msg_set_size(&tnlhdr, pktlen + INT_H_SIZE);
tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC); tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC);
if (!tnlskb) { if (!tnlskb) {
...@@ -1742,6 +1801,12 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, ...@@ -1742,6 +1801,12 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
goto tnl; goto tnl;
} }
if (pktcnt_need_update)
skb_queue_walk(&tnlq, skb) {
hdr = buf_msg(skb);
msg_set_msgcnt(hdr, pktcnt);
}
tipc_link_xmit(tnl, &tnlq, xmitq); tipc_link_xmit(tnl, &tnlq, xmitq);
if (mtyp == FAILOVER_MSG) { if (mtyp == FAILOVER_MSG) {
......
...@@ -243,6 +243,65 @@ bool tipc_msg_validate(struct sk_buff **_skb) ...@@ -243,6 +243,65 @@ bool tipc_msg_validate(struct sk_buff **_skb)
return true; return true;
} }
/**
* tipc_msg_fragment - build a fragment skb list for TIPC message
*
* @skb: TIPC message skb
* @hdr: internal msg header to be put on the top of the fragments
* @pktmax: max size of a fragment incl. the header
* @frags: returned fragment skb list
*
* Returns 0 if the fragmentation is successful, otherwise: -EINVAL
* or -ENOMEM
*/
int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
int pktmax, struct sk_buff_head *frags)
{
int pktno, nof_fragms, dsz, dmax, eat;
struct tipc_msg *_hdr;
struct sk_buff *_skb;
u8 *data;
/* Non-linear buffer? */
if (skb_linearize(skb))
return -ENOMEM;
data = (u8 *)skb->data;
dsz = msg_size(buf_msg(skb));
dmax = pktmax - INT_H_SIZE;
if (dsz <= dmax || !dmax)
return -EINVAL;
nof_fragms = dsz / dmax + 1;
for (pktno = 1; pktno <= nof_fragms; pktno++) {
if (pktno < nof_fragms)
eat = dmax;
else
eat = dsz % dmax;
/* Allocate a new fragment */
_skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC);
if (!_skb)
goto error;
skb_orphan(_skb);
__skb_queue_tail(frags, _skb);
/* Copy header & data to the fragment */
skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE);
skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat);
data += eat;
/* Update the fragment's header */
_hdr = buf_msg(_skb);
msg_set_fragm_no(_hdr, pktno);
msg_set_nof_fragms(_hdr, nof_fragms);
msg_set_size(_hdr, INT_H_SIZE + eat);
}
return 0;
error:
__skb_queue_purge(frags);
__skb_queue_head_init(frags);
return -ENOMEM;
}
/** /**
* tipc_msg_build - create buffer chain containing specified header and data * tipc_msg_build - create buffer chain containing specified header and data
* @mhdr: Message header, to be prepended to data * @mhdr: Message header, to be prepended to data
......
...@@ -721,12 +721,26 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) ...@@ -721,12 +721,26 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
msg_set_bits(m, 4, 16, 0xffff, n); msg_set_bits(m, 4, 16, 0xffff, n);
} }
static inline u32 msg_nof_fragms(struct tipc_msg *m)
{
return msg_bits(m, 4, 0, 0xffff);
}
static inline void msg_set_nof_fragms(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 4, 0, 0xffff, n);
}
static inline u32 msg_fragm_no(struct tipc_msg *m)
{
return msg_bits(m, 4, 16, 0xffff);
}
static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
{ {
msg_set_bits(m, 4, 16, 0xffff, n); msg_set_bits(m, 4, 16, 0xffff, n);
} }
static inline u16 msg_next_sent(struct tipc_msg *m) static inline u16 msg_next_sent(struct tipc_msg *m)
{ {
return msg_bits(m, 4, 0, 0xffff); return msg_bits(m, 4, 0, 0xffff);
...@@ -1045,6 +1059,8 @@ bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu); ...@@ -1045,6 +1059,8 @@ bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu);
bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
u32 mtu, u32 dnode); u32 mtu, u32 dnode);
bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
int pktmax, struct sk_buff_head *frags);
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu, struct sk_buff_head *list); int offset, int dsz, int mtu, struct sk_buff_head *list);
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment