Commit a91d55d1 authored by Tuong Lien's avatar Tuong Lien Committed by David S. Miller

tipc: enable broadcast retrans via unicast

In some environment, broadcast traffic is suppressed at high rate (i.e.
a kind of bandwidth limit setting). When it is applied, TIPC broadcast
can still run successfully. However, when it comes to a high load, some
packets will be dropped first and TIPC tries to retransmit them but the
packet retransmission is intentionally broadcast too, so making things
worse and not helpful at all.

This commit enables the broadcast retransmission via unicast which only
retransmits packets to the specific peer that has really reported a gap
i.e. not broadcasting to all nodes in the cluster, so will prevent from
being suppressed, and also reduce some overheads on the other peers due
to duplicates, finally improve the overall TIPC broadcast performance.

Note: the functionality can be turned on/off via the sysctl file:

echo 1 > /proc/sys/net/tipc/bc_retruni
echo 0 > /proc/sys/net/tipc/bc_retruni

Default is '0', i.e. the broadcast retransmission still works as usual.
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Acked-by: default avatarJon Maloy <jmaloy@redhat.com>
Signed-off-by: default avatarTuong Lien <tuong.t.lien@dektech.com.au>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent c6ed7a5c
...@@ -46,6 +46,7 @@ ...@@ -46,6 +46,7 @@
#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
const char tipc_bclink_name[] = "broadcast-link"; const char tipc_bclink_name[] = "broadcast-link";
unsigned long sysctl_tipc_bc_retruni __read_mostly;
/** /**
* struct tipc_bc_base - base structure for keeping broadcast send state * struct tipc_bc_base - base structure for keeping broadcast send state
...@@ -474,7 +475,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, ...@@ -474,7 +475,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
__skb_queue_head_init(&xmitq); __skb_queue_head_init(&xmitq);
tipc_bcast_lock(net); tipc_bcast_lock(net);
tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq); tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq, NULL);
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
tipc_bcbase_xmit(net, &xmitq); tipc_bcbase_xmit(net, &xmitq);
...@@ -489,7 +490,8 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, ...@@ -489,7 +490,8 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
* RCU is locked, no other locks set * RCU is locked, no other locks set
*/ */
int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr) struct tipc_msg *hdr,
struct sk_buff_head *retrq)
{ {
struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
struct tipc_gap_ack_blks *ga; struct tipc_gap_ack_blks *ga;
...@@ -503,8 +505,11 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, ...@@ -503,8 +505,11 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
tipc_link_bc_init_rcv(l, hdr); tipc_link_bc_init_rcv(l, hdr);
} else if (!msg_bc_ack_invalid(hdr)) { } else if (!msg_bc_ack_invalid(hdr)) {
tipc_get_gap_ack_blks(&ga, l, hdr, false); tipc_get_gap_ack_blks(&ga, l, hdr, false);
if (!sysctl_tipc_bc_retruni)
retrq = &xmitq;
rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr),
msg_bc_gap(hdr), ga, &xmitq); msg_bc_gap(hdr), ga, &xmitq,
retrq);
rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq); rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq);
} }
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
......
...@@ -45,6 +45,7 @@ struct tipc_nl_msg; ...@@ -45,6 +45,7 @@ struct tipc_nl_msg;
struct tipc_nlist; struct tipc_nlist;
struct tipc_nitem; struct tipc_nitem;
extern const char tipc_bclink_name[]; extern const char tipc_bclink_name[];
extern unsigned long sysctl_tipc_bc_retruni;
#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000) #define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)
...@@ -93,7 +94,8 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); ...@@ -93,7 +94,8 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr); struct tipc_msg *hdr);
int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr); struct tipc_msg *hdr,
struct sk_buff_head *retrq);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
int tipc_bclink_reset_stats(struct net *net); int tipc_bclink_reset_stats(struct net *net);
......
...@@ -375,7 +375,7 @@ void tipc_link_remove_bc_peer(struct tipc_link *snd_l, ...@@ -375,7 +375,7 @@ void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
snd_l->ackers--; snd_l->ackers--;
rcv_l->bc_peer_is_up = true; rcv_l->bc_peer_is_up = true;
rcv_l->state = LINK_ESTABLISHED; rcv_l->state = LINK_ESTABLISHED;
tipc_link_bc_ack_rcv(rcv_l, ack, 0, NULL, xmitq); tipc_link_bc_ack_rcv(rcv_l, ack, 0, NULL, xmitq, NULL);
trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!"); trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!");
tipc_link_reset(rcv_l); tipc_link_reset(rcv_l);
rcv_l->state = LINK_RESET; rcv_l->state = LINK_RESET;
...@@ -2400,7 +2400,8 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, ...@@ -2400,7 +2400,8 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap, int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap,
struct tipc_gap_ack_blks *ga, struct tipc_gap_ack_blks *ga,
struct sk_buff_head *xmitq) struct sk_buff_head *xmitq,
struct sk_buff_head *retrq)
{ {
struct tipc_link *l = r->bc_sndlink; struct tipc_link *l = r->bc_sndlink;
bool unused = false; bool unused = false;
...@@ -2413,7 +2414,7 @@ int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap, ...@@ -2413,7 +2414,7 @@ int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap,
return 0; return 0;
trace_tipc_link_bc_ack(r, acked, gap, &l->transmq); trace_tipc_link_bc_ack(r, acked, gap, &l->transmq);
tipc_link_advance_transmq(l, r, acked, gap, ga, xmitq, &unused, &rc); tipc_link_advance_transmq(l, r, acked, gap, ga, retrq, &unused, &rc);
tipc_link_advance_backlog(l, xmitq); tipc_link_advance_backlog(l, xmitq);
if (unlikely(!skb_queue_empty(&l->wakeupq))) if (unlikely(!skb_queue_empty(&l->wakeupq)))
...@@ -2447,7 +2448,8 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, ...@@ -2447,7 +2448,8 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
return 0; return 0;
if (dnode == tipc_own_addr(l->net)) { if (dnode == tipc_own_addr(l->net)) {
rc = tipc_link_bc_ack_rcv(l, acked, to - acked, NULL, xmitq); rc = tipc_link_bc_ack_rcv(l, acked, to - acked, NULL, xmitq,
xmitq);
l->stats.recv_nacks++; l->stats.recv_nacks++;
return rc; return rc;
} }
......
...@@ -147,7 +147,8 @@ u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct tipc_link *l, ...@@ -147,7 +147,8 @@ u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct tipc_link *l,
struct tipc_msg *hdr, bool uc); struct tipc_msg *hdr, bool uc);
int tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, u16 gap, int tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, u16 gap,
struct tipc_gap_ack_blks *ga, struct tipc_gap_ack_blks *ga,
struct sk_buff_head *xmitq); struct sk_buff_head *xmitq,
struct sk_buff_head *retrq);
void tipc_link_build_bc_sync_msg(struct tipc_link *l, void tipc_link_build_bc_sync_msg(struct tipc_link *l,
struct sk_buff_head *xmitq); struct sk_buff_head *xmitq);
void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr); void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
......
...@@ -1772,7 +1772,7 @@ static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr, ...@@ -1772,7 +1772,7 @@ static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
struct tipc_link *ucl; struct tipc_link *ucl;
int rc; int rc;
rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr); rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr, xmitq);
if (rc & TIPC_LINK_DOWN_EVT) { if (rc & TIPC_LINK_DOWN_EVT) {
tipc_node_reset_links(n); tipc_node_reset_links(n);
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
#include "core.h" #include "core.h"
#include "trace.h" #include "trace.h"
#include "crypto.h" #include "crypto.h"
#include "bcast.h"
#include <linux/sysctl.h> #include <linux/sysctl.h>
static struct ctl_table_header *tipc_ctl_hdr; static struct ctl_table_header *tipc_ctl_hdr;
...@@ -75,6 +75,13 @@ static struct ctl_table tipc_table[] = { ...@@ -75,6 +75,13 @@ static struct ctl_table tipc_table[] = {
.extra1 = SYSCTL_ONE, .extra1 = SYSCTL_ONE,
}, },
#endif #endif
{
.procname = "bc_retruni",
.data = &sysctl_tipc_bc_retruni,
.maxlen = sizeof(sysctl_tipc_bc_retruni),
.mode = 0644,
.proc_handler = proc_doulongvec_minmax,
},
{} {}
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment