Commit 2af5ae37 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: clean up unused code and structures

After the previous changes in this series, we can now remove some
unused code and structures, both in the broadcast, link aggregation
and link code.

There are no functional changes in this commit.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent c49a0a84
...@@ -43,72 +43,23 @@ ...@@ -43,72 +43,23 @@
#include "link.h" #include "link.h"
#include "node.h" #include "node.h"
#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */
#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ #define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */
#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
const char tipc_bclink_name[] = "broadcast-link"; const char tipc_bclink_name[] = "broadcast-link";
/** /**
* struct tipc_bcbearer_pair - a pair of bearers used by broadcast link * struct tipc_bc_base - base structure for keeping broadcast send state
* @primary: pointer to primary bearer
* @secondary: pointer to secondary bearer
*
* Bearers must have same priority and same set of reachable destinations
* to be paired.
*/
struct tipc_bcbearer_pair {
struct tipc_bearer *primary;
struct tipc_bearer *secondary;
};
#define BCBEARER MAX_BEARERS
/**
* struct tipc_bcbearer - bearer used by broadcast link
* @bearer: (non-standard) broadcast bearer structure
* @media: (non-standard) broadcast media structure
* @bpairs: array of bearer pairs
* @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
* @remains: temporary node map used by tipc_bcbearer_send()
* @remains_new: temporary node map used tipc_bcbearer_send()
*
* Note: The fields labelled "temporary" are incorporated into the bearer
* to avoid consuming potentially limited stack space through the use of
* large local variables within multicast routines. Concurrent access is
* prevented through use of the spinlock "bcast_lock".
*/
struct tipc_bcbearer {
struct tipc_bearer bearer;
struct tipc_media media;
struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
struct tipc_node_map remains;
struct tipc_node_map remains_new;
};
/**
* struct tipc_bc_base - link used for broadcast messages
* @link: broadcast send link structure * @link: broadcast send link structure
* @node: (non-standard) node structure representing b'cast link's peer node * @inputq: data input queue; will only carry SOCK_WAKEUP messages
* @bcast_nodes: map of broadcast-capable nodes * @dest: array keeping number of reachable destinations per bearer
* @retransmit_to: node that most recently requested a retransmit * @primary_bearer: a bearer having links to all broadcast destinations, if any
* @dest_nnt: array indicating number of reachable destinations per bearer
* @bearers: array of bearers, sorted by number of reachable destinations
*
* Handles sequence numbering, fragmentation, bundling, etc.
*/ */
struct tipc_bc_base { struct tipc_bc_base {
struct tipc_link *link; struct tipc_link *link;
struct tipc_node node;
struct sk_buff_head arrvq;
struct sk_buff_head inputq; struct sk_buff_head inputq;
struct sk_buff_head namedq;
int dests[MAX_BEARERS]; int dests[MAX_BEARERS];
int primary_bearer; int primary_bearer;
struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to;
}; };
static struct tipc_bc_base *tipc_bc_base(struct net *net) static struct tipc_bc_base *tipc_bc_base(struct net *net)
...@@ -116,58 +67,11 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net) ...@@ -116,58 +67,11 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
return tipc_net(net)->bcbase; return tipc_net(net)->bcbase;
} }
/**
* tipc_nmap_equal - test for equality of node maps
*/
static int tipc_nmap_equal(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b)
{
return !memcmp(nm_a, nm_b, sizeof(*nm_a));
}
static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq);
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff);
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_bclink_lock(struct net *net)
{
tipc_bcast_lock(net);
}
static void tipc_bclink_unlock(struct net *net)
{
tipc_bcast_unlock(net);
}
void tipc_bclink_input(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
tipc_sk_mcast_rcv(net, &tn->bcbase->arrvq, &tn->bcbase->inputq);
}
int tipc_bcast_get_mtu(struct net *net) int tipc_bcast_get_mtu(struct net *net)
{ {
return tipc_link_mtu(tipc_bc_sndlink(net)); return tipc_link_mtu(tipc_bc_sndlink(net));
} }
static u16 bcbuf_acks(struct sk_buff *skb)
{
return TIPC_SKB_CB(skb)->ackers;
}
static void bcbuf_set_acks(struct sk_buff *buf, u16 ackers)
{
TIPC_SKB_CB(buf)->ackers = ackers;
}
static void bcbuf_decr_acks(struct sk_buff *buf)
{
bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}
/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, /* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
* if any, and make it primary bearer * if any, and make it primary bearer
*/ */
...@@ -221,281 +125,6 @@ void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id) ...@@ -221,281 +125,6 @@ void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id)
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
} }
static void bclink_set_last_sent(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_link *bcl = tn->bcl;
bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1);
}
u32 tipc_bclink_get_last_sent(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
return tn->bcl->silent_intv_cnt;
}
static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
{
node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
seqno : node->bclink.last_sent;
}
/**
* tipc_bclink_retransmit_to - get most recent node to request retransmission
*
* Called with bclink_lock locked
*/
struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
return tn->bcbase->retransmit_to;
}
/**
* bclink_retransmit_pkt - retransmit broadcast packets
* @after: sequence number of last packet to *not* retransmit
* @to: sequence number of last packet to retransmit
*
* Called with bclink_lock locked
*/
static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
{
struct sk_buff *skb;
struct tipc_link *bcl = tn->bcl;
skb_queue_walk(&bcl->transmq, skb) {
if (more(buf_seqno(skb), after)) {
tipc_link_retransmit(bcl, skb, mod(to - after));
break;
}
}
}
/**
* bclink_prepare_wakeup - prepare users for wakeup after congestion
* @bcl: broadcast link
* @resultq: queue for users which can be woken up
* Move a number of waiting users, as permitted by available space in
* the send queue, from link wait queue to specified queue for wakeup
*/
static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq)
{
int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
int imp, lim;
struct sk_buff *skb, *tmp;
skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) {
imp = TIPC_SKB_CB(skb)->chain_imp;
lim = bcl->window + bcl->backlog[imp].limit;
pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
if ((pnd[imp] + bcl->backlog[imp].len) >= lim)
continue;
skb_unlink(skb, &bcl->wakeupq);
skb_queue_tail(resultq, skb);
}
}
/**
* tipc_bclink_wakeup_users - wake up pending users
*
* Called with no locks taken
*/
void tipc_bclink_wakeup_users(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_link *bcl = tn->bcl;
struct sk_buff_head resultq;
skb_queue_head_init(&resultq);
bclink_prepare_wakeup(bcl, &resultq);
tipc_sk_rcv(net, &resultq);
}
/**
* tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
* @n_ptr: node that sent acknowledgement info
* @acked: broadcast sequence # that has been acknowledged
*
* Node is locked, bclink_lock unlocked.
*/
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
struct sk_buff *skb, *tmp;
unsigned int released = 0;
struct net *net = n_ptr->net;
struct tipc_net *tn = net_generic(net, tipc_net_id);
if (unlikely(!n_ptr->bclink.recv_permitted))
return;
tipc_bclink_lock(net);
/* Bail out if tx queue is empty (no clean up is required) */
skb = skb_peek(&tn->bcl->transmq);
if (!skb)
goto exit;
/* Determine which messages need to be acknowledged */
if (acked == INVALID_LINK_SEQ) {
/*
* Contact with specified node has been lost, so need to
* acknowledge sent messages only (if other nodes still exist)
* or both sent and unsent messages (otherwise)
*/
if (tn->bcbase->bcast_nodes.count)
acked = tn->bcl->silent_intv_cnt;
else
acked = tn->bcl->snd_nxt;
} else {
/*
* Bail out if specified sequence number does not correspond
* to a message that has been sent and not yet acknowledged
*/
if (less(acked, buf_seqno(skb)) ||
less(tn->bcl->silent_intv_cnt, acked) ||
less_eq(acked, n_ptr->bclink.acked))
goto exit;
}
/* Skip over packets that node has previously acknowledged */
skb_queue_walk(&tn->bcl->transmq, skb) {
if (more(buf_seqno(skb), n_ptr->bclink.acked))
break;
}
/* Update packets that node is now acknowledging */
skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
if (more(buf_seqno(skb), acked))
break;
bcbuf_decr_acks(skb);
bclink_set_last_sent(net);
if (bcbuf_acks(skb) == 0) {
__skb_unlink(skb, &tn->bcl->transmq);
kfree_skb(skb);
released = 1;
}
}
n_ptr->bclink.acked = acked;
/* Try resolving broadcast link congestion, if necessary */
if (unlikely(skb_peek(&tn->bcl->backlogq))) {
tipc_link_push_packets(tn->bcl);
bclink_set_last_sent(net);
}
if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
exit:
tipc_bclink_unlock(net);
}
/**
* tipc_bclink_update_link_state - update broadcast link state
*
* RCU and node lock set
*/
void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
u32 last_sent)
{
struct sk_buff *buf;
struct net *net = n_ptr->net;
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_link *bcl = tn->bcl;
/* Ignore "stale" link state info */
if (less_eq(last_sent, n_ptr->bclink.last_in))
return;
/* Update link synchronization state; quit if in sync */
bclink_update_last_sent(n_ptr, last_sent);
/* This is a good location for statistical profiling */
bcl->stats.queue_sz_counts++;
bcl->stats.accu_queue_sz += skb_queue_len(&bcl->transmq);
if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
return;
/* Update out-of-sync state; quit if loss is still unconfirmed */
if ((++n_ptr->bclink.oos_state) == 1) {
if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
return;
n_ptr->bclink.oos_state++;
}
/* Don't NACK if one has been recently sent (or seen) */
if (n_ptr->bclink.oos_state & 0x1)
return;
/* Send NACK */
buf = tipc_buf_acquire(INT_H_SIZE);
if (buf) {
struct tipc_msg *msg = buf_msg(buf);
struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
INT_H_SIZE, n_ptr->addr);
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tn->net_id);
msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
msg_set_bcgap_to(msg, to);
tipc_bclink_lock(net);
tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
tn->bcl->stats.sent_nacks++;
tipc_bclink_unlock(net);
kfree_skb(buf);
n_ptr->bclink.oos_state++;
}
}
void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
{
u16 last = msg_last_bcast(hdr);
int mtyp = msg_type(hdr);
if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
return;
if (mtyp == STATE_MSG) {
tipc_bclink_update_link_state(n, last);
return;
}
/* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
* and transfer synch info in LINK_PROTOCOL messages.
*/
if (tipc_node_is_up(n))
return;
if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
return;
n->bclink.last_sent = last;
n->bclink.last_in = last;
n->bclink.oos_state = 0;
}
/**
* bclink_peek_nack - monitor retransmission requests sent by other nodes
*
* Delay any upcoming NACK by this node if another node has already
* requested the first message this node is going to ask for.
*/
static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
{
struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
if (unlikely(!n_ptr))
return;
tipc_node_lock(n_ptr);
if (n_ptr->bclink.recv_permitted &&
(n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
(n_ptr->bclink.last_in == msg_bcgap_after(msg)))
n_ptr->bclink.oos_state = 2;
tipc_node_unlock(n_ptr);
tipc_node_put(n_ptr);
}
/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers /* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers
* *
* Note that number of reachable destinations, as indicated in the dests[] * Note that number of reachable destinations, as indicated in the dests[]
...@@ -703,333 +332,6 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) ...@@ -703,333 +332,6 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
tipc_sk_rcv(net, inputq); tipc_sk_rcv(net, inputq);
} }
/**
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
* Called with both sending node's lock and bclink_lock taken.
*/
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
struct tipc_net *tn = net_generic(node->net, tipc_net_id);
bclink_update_last_sent(node, seqno);
node->bclink.last_in = seqno;
node->bclink.oos_state = 0;
tn->bcl->stats.recv_info++;
/*
* Unicast an ACK periodically, ensuring that
* all nodes in the cluster don't ACK at the same time
*/
if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
tipc_link_proto_xmit(node_active_link(node, node->addr),
STATE_MSG, 0, 0, 0, 0);
tn->bcl->stats.sent_acks++;
}
}
/**
* tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
*
* RCU is locked, no other locks set
*/
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_link *bcl = tn->bcl;
struct tipc_msg *msg = buf_msg(buf);
struct tipc_node *node;
u32 next_in;
u32 seqno;
int deferred = 0;
int pos = 0;
struct sk_buff *iskb;
struct sk_buff_head *arrvq, *inputq;
/* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tn->net_id)
goto exit;
node = tipc_node_find(net, msg_prevnode(msg));
if (unlikely(!node))
goto exit;
tipc_node_lock(node);
if (unlikely(!node->bclink.recv_permitted))
goto unlock;
/* Handle broadcast protocol message */
if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
if (msg_type(msg) != STATE_MSG)
goto unlock;
if (msg_destnode(msg) == tn->own_addr) {
tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
tipc_bclink_lock(net);
bcl->stats.recv_nacks++;
tn->bcbase->retransmit_to = node;
bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
msg_bcgap_to(msg));
tipc_bclink_unlock(net);
tipc_node_unlock(node);
} else {
tipc_node_unlock(node);
bclink_peek_nack(net, msg);
}
tipc_node_put(node);
goto exit;
}
/* Handle in-sequence broadcast message */
seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1);
arrvq = &tn->bcbase->arrvq;
inputq = &tn->bcbase->inputq;
if (likely(seqno == next_in)) {
receive:
/* Deliver message to destination */
if (likely(msg_isdata(msg))) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
spin_lock_bh(&inputq->lock);
__skb_queue_tail(arrvq, buf);
spin_unlock_bh(&inputq->lock);
node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net);
tipc_node_unlock(node);
} else if (msg_user(msg) == MSG_BUNDLER) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
pos = 0;
while (tipc_msg_extract(buf, &iskb, &pos)) {
spin_lock_bh(&inputq->lock);
__skb_queue_tail(arrvq, iskb);
spin_unlock_bh(&inputq->lock);
}
node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net);
tipc_node_unlock(node);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
tipc_buf_append(&node->bclink.reasm_buf, &buf);
if (unlikely(!buf && !node->bclink.reasm_buf)) {
tipc_bclink_unlock(net);
goto unlock;
}
bcl->stats.recv_fragments++;
if (buf) {
bcl->stats.recv_fragmented++;
msg = buf_msg(buf);
tipc_bclink_unlock(net);
goto receive;
}
tipc_bclink_unlock(net);
tipc_node_unlock(node);
} else {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
tipc_bclink_unlock(net);
tipc_node_unlock(node);
kfree_skb(buf);
}
buf = NULL;
/* Determine new synchronization state */
tipc_node_lock(node);
if (unlikely(!tipc_node_is_up(node)))
goto unlock;
if (node->bclink.last_in == node->bclink.last_sent)
goto unlock;
if (skb_queue_empty(&node->bclink.deferdq)) {
node->bclink.oos_state = 1;
goto unlock;
}
msg = buf_msg(skb_peek(&node->bclink.deferdq));
seqno = msg_seqno(msg);
next_in = mod(next_in + 1);
if (seqno != next_in)
goto unlock;
/* Take in-sequence message from deferred queue & deliver it */
buf = __skb_dequeue(&node->bclink.deferdq);
goto receive;
}
/* Handle out-of-sequence broadcast message */
if (less(next_in, seqno)) {
deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
buf);
bclink_update_last_sent(node, seqno);
buf = NULL;
}
tipc_bclink_lock(net);
if (deferred)
bcl->stats.deferred_recv++;
else
bcl->stats.duplicates++;
tipc_bclink_unlock(net);
unlock:
tipc_node_unlock(node);
tipc_node_put(node);
exit:
kfree_skb(buf);
}
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
{
return (n_ptr->bclink.recv_permitted &&
(tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
}
/**
* tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
*
* Send packet over as many bearers as necessary to reach all nodes
* that have joined the broadcast link.
*
* Returns 0 (packet sent successfully) under all circumstances,
* since the broadcast link's pseudo-bearer never blocks
*/
static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
struct tipc_bearer *unused1,
struct tipc_media_addr *unused2)
{
int bp_index;
struct tipc_msg *msg = buf_msg(buf);
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_bcbearer *bcbearer = tn->bcbearer;
struct tipc_bc_base *bclink = tn->bcbase;
/* Prepare broadcast link message for reliable transmission,
* if first time trying to send it;
* preparation is skipped for broadcast link protocol messages
* since they are sent in an unreliable manner and don't need it
*/
if (likely(!msg_non_seq(buf_msg(buf)))) {
bcbuf_set_acks(buf, bclink->bcast_nodes.count);
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tn->net_id);
tn->bcl->stats.sent_info++;
if (WARN_ON(!bclink->bcast_nodes.count)) {
dump_stack();
return 0;
}
}
msg_set_mc_netid(msg, tn->net_id);
/* Send buffer over bearers until all targets reached */
bcbearer->remains = bclink->bcast_nodes;
for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
struct tipc_bearer *bp[2] = {p, s};
struct tipc_bearer *b = bp[msg_link_selector(msg)];
struct sk_buff *tbuf;
if (!p)
break; /* No more bearers to try */
if (!b)
b = p;
tipc_nmap_diff(&bcbearer->remains, &b->nodes,
&bcbearer->remains_new);
if (bcbearer->remains_new.count == bcbearer->remains.count)
continue; /* Nothing added by bearer pair */
if (bp_index == 0) {
/* Use original buffer for first bearer */
tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
} else {
/* Avoid concurrent buffer access */
tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
if (!tbuf)
break;
tipc_bearer_send(net, b->identity, tbuf,
&b->bcast_addr);
kfree_skb(tbuf); /* Bearer keeps a clone */
}
if (bcbearer->remains_new.count == 0)
break; /* All targets reached */
bcbearer->remains = bcbearer->remains_new;
}
return 0;
}
/**
* tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
*/
void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
u32 node, bool action)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_bcbearer *bcbearer = tn->bcbearer;
struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
struct tipc_bcbearer_pair *bp_curr;
struct tipc_bearer *b;
int b_index;
int pri;
tipc_bclink_lock(net);
if (action)
tipc_nmap_add(nm_ptr, node);
else
tipc_nmap_remove(nm_ptr, node);
/* Group bearers by priority (can assume max of two per priority) */
memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
rcu_read_lock();
for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
if (!b || !b->nodes.count)
continue;
if (!bp_temp[b->priority].primary)
bp_temp[b->priority].primary = b;
else
bp_temp[b->priority].secondary = b;
}
rcu_read_unlock();
/* Create array of bearer pairs for broadcasting */
bp_curr = bcbearer->bpairs;
memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
if (!bp_temp[pri].primary)
continue;
bp_curr->primary = bp_temp[pri].primary;
if (bp_temp[pri].secondary) {
if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
&bp_temp[pri].secondary->nodes)) {
bp_curr->secondary = bp_temp[pri].secondary;
} else {
bp_curr++;
bp_curr->primary = bp_temp[pri].secondary;
}
}
bp_curr++;
}
tipc_bclink_unlock(net);
}
static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
struct tipc_stats *stats) struct tipc_stats *stats)
{ {
...@@ -1093,7 +395,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) ...@@ -1093,7 +395,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
if (!bcl) if (!bcl)
return 0; return 0;
tipc_bclink_lock(net); tipc_bcast_lock(net);
hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
NLM_F_MULTI, TIPC_NL_LINK_GET); NLM_F_MULTI, TIPC_NL_LINK_GET);
...@@ -1128,7 +430,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) ...@@ -1128,7 +430,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
if (err) if (err)
goto attr_msg_full; goto attr_msg_full;
tipc_bclink_unlock(net); tipc_bcast_unlock(net);
nla_nest_end(msg->skb, attrs); nla_nest_end(msg->skb, attrs);
genlmsg_end(msg->skb, hdr); genlmsg_end(msg->skb, hdr);
...@@ -1139,7 +441,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) ...@@ -1139,7 +441,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
attr_msg_full: attr_msg_full:
nla_nest_cancel(msg->skb, attrs); nla_nest_cancel(msg->skb, attrs);
msg_full: msg_full:
tipc_bclink_unlock(net); tipc_bcast_unlock(net);
genlmsg_cancel(msg->skb, hdr); genlmsg_cancel(msg->skb, hdr);
return -EMSGSIZE; return -EMSGSIZE;
...@@ -1153,26 +455,25 @@ int tipc_bclink_reset_stats(struct net *net) ...@@ -1153,26 +455,25 @@ int tipc_bclink_reset_stats(struct net *net)
if (!bcl) if (!bcl)
return -ENOPROTOOPT; return -ENOPROTOOPT;
tipc_bclink_lock(net); tipc_bcast_lock(net);
memset(&bcl->stats, 0, sizeof(bcl->stats)); memset(&bcl->stats, 0, sizeof(bcl->stats));
tipc_bclink_unlock(net); tipc_bcast_unlock(net);
return 0; return 0;
} }
int tipc_bclink_set_queue_limits(struct net *net, u32 limit) static int tipc_bc_link_set_queue_limits(struct net *net, u32 limit)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_link *l = tipc_bc_sndlink(net);
struct tipc_link *bcl = tn->bcl;
if (!bcl) if (!l)
return -ENOPROTOOPT; return -ENOPROTOOPT;
if (limit < BCLINK_WIN_MIN) if (limit < BCLINK_WIN_MIN)
limit = BCLINK_WIN_MIN; limit = BCLINK_WIN_MIN;
if (limit > TIPC_MAX_LINK_WIN) if (limit > TIPC_MAX_LINK_WIN)
return -EINVAL; return -EINVAL;
tipc_bclink_lock(net); tipc_bcast_lock(net);
tipc_link_set_queue_limits(bcl, limit); tipc_link_set_queue_limits(l, limit);
tipc_bclink_unlock(net); tipc_bcast_unlock(net);
return 0; return 0;
} }
...@@ -1194,53 +495,34 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]) ...@@ -1194,53 +495,34 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
return tipc_bclink_set_queue_limits(net, win); return tipc_bc_link_set_queue_limits(net, win);
} }
int tipc_bcast_init(struct net *net) int tipc_bcast_init(struct net *net)
{ {
struct tipc_net *tn = tipc_net(net); struct tipc_net *tn = tipc_net(net);
struct tipc_bcbearer *bcb = NULL;
struct tipc_bc_base *bb = NULL; struct tipc_bc_base *bb = NULL;
struct tipc_link *l = NULL; struct tipc_link *l = NULL;
bcb = kzalloc(sizeof(*bcb), GFP_ATOMIC);
if (!bcb)
goto enomem;
tn->bcbearer = bcb;
bcb->bearer.window = BCLINK_WIN_DEFAULT;
bcb->bearer.mtu = MAX_PKT_DEFAULT_MCAST;
bcb->bearer.identity = MAX_BEARERS;
bcb->bearer.media = &bcb->media;
bcb->media.send_msg = tipc_bcbearer_send;
sprintf(bcb->media.name, "tipc-broadcast");
strcpy(bcb->bearer.name, bcb->media.name);
bb = kzalloc(sizeof(*bb), GFP_ATOMIC); bb = kzalloc(sizeof(*bb), GFP_ATOMIC);
if (!bb) if (!bb)
goto enomem; goto enomem;
tn->bcbase = bb; tn->bcbase = bb;
__skb_queue_head_init(&bb->arrvq);
spin_lock_init(&tipc_net(net)->bclock); spin_lock_init(&tipc_net(net)->bclock);
bb->node.net = net;
if (!tipc_link_bc_create(net, 0, 0, if (!tipc_link_bc_create(net, 0, 0,
U16_MAX, U16_MAX,
BCLINK_WIN_DEFAULT, BCLINK_WIN_DEFAULT,
0, 0,
&bb->inputq, &bb->inputq,
&bb->namedq, NULL,
NULL, NULL,
&l)) &l))
goto enomem; goto enomem;
bb->link = l; bb->link = l;
tn->bcl = l; tn->bcl = l;
rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcb->bearer);
return 0; return 0;
enomem: enomem:
kfree(bcb);
kfree(bb); kfree(bb);
kfree(l); kfree(l);
return -ENOMEM; return -ENOMEM;
...@@ -1257,70 +539,7 @@ void tipc_bcast_stop(struct net *net) ...@@ -1257,70 +539,7 @@ void tipc_bcast_stop(struct net *net)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = net_generic(net, tipc_net_id);
tipc_bclink_lock(net);
tipc_link_purge_queues(tn->bcl);
tipc_bclink_unlock(net);
RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
synchronize_net(); synchronize_net();
kfree(tn->bcbearer);
kfree(tn->bcbase); kfree(tn->bcbase);
kfree(tn->bcl); kfree(tn->bcl);
} }
/**
* tipc_nmap_add - add a node to a node map
*/
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
u32 mask = (1 << (n % WSIZE));
if ((nm_ptr->map[w] & mask) == 0) {
nm_ptr->count++;
nm_ptr->map[w] |= mask;
}
}
/**
* tipc_nmap_remove - remove a node from a node map
*/
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
u32 mask = (1 << (n % WSIZE));
if ((nm_ptr->map[w] & mask) != 0) {
nm_ptr->map[w] &= ~mask;
nm_ptr->count--;
}
}
/**
* tipc_nmap_diff - find differences between node maps
* @nm_a: input node map A
* @nm_b: input node map B
* @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
*/
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff)
{
int stop = ARRAY_SIZE(nm_a->map);
int w;
int b;
u32 map;
memset(nm_diff, 0, sizeof(*nm_diff));
for (w = 0; w < stop; w++) {
map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
nm_diff->map[w] = map;
if (map != 0) {
for (b = 0 ; b < WSIZE; b++) {
if (map & (1 << b))
nm_diff->count++;
}
}
}
}
...@@ -52,28 +52,15 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, ...@@ -52,28 +52,15 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl); void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
u32 tipc_bclink_get_last_sent(struct net *net);
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
void tipc_bclink_update_link_state(struct tipc_node *node,
u32 last_sent);
void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
u32 node, bool action);
int tipc_bclink_reset_stats(struct net *net);
int tipc_bclink_set_queue_limits(struct net *net, u32 limit);
int tipc_bcast_get_mtu(struct net *net); int tipc_bcast_get_mtu(struct net *net);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked); void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr); struct tipc_msg *hdr);
void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
void tipc_bclink_input(struct net *net); int tipc_bclink_reset_stats(struct net *net);
void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);
static inline void tipc_bcast_lock(struct net *net) static inline void tipc_bcast_lock(struct net *net)
{ {
......
...@@ -440,25 +440,6 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb, ...@@ -440,25 +440,6 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
return 0; return 0;
} }
/* tipc_bearer_send- sends buffer to destination over bearer
*
* IMPORTANT:
* The media send routine must not alter the buffer being passed in
* as it may be needed for later retransmission!
*/
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_bearer *b_ptr;
rcu_read_lock();
b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (likely(b_ptr))
b_ptr->media->send_msg(net, buf, b_ptr, dest);
rcu_read_unlock();
}
int tipc_bearer_mtu(struct net *net, u32 bearer_id) int tipc_bearer_mtu(struct net *net, u32 bearer_id)
{ {
int mtu = 0; int mtu = 0;
......
...@@ -216,8 +216,6 @@ struct tipc_media *tipc_media_find(const char *name); ...@@ -216,8 +216,6 @@ struct tipc_media *tipc_media_find(const char *name);
int tipc_bearer_setup(void); int tipc_bearer_setup(void);
void tipc_bearer_cleanup(void); void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net); void tipc_bearer_stop(struct net *net);
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest);
int tipc_bearer_mtu(struct net *net, u32 bearer_id); int tipc_bearer_mtu(struct net *net, u32 bearer_id);
void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id, void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
struct sk_buff *skb, struct sk_buff *skb,
......
...@@ -62,7 +62,6 @@ ...@@ -62,7 +62,6 @@
struct tipc_node; struct tipc_node;
struct tipc_bearer; struct tipc_bearer;
struct tipc_bcbearer;
struct tipc_bc_base; struct tipc_bc_base;
struct tipc_link; struct tipc_link;
struct tipc_name_table; struct tipc_name_table;
...@@ -94,7 +93,6 @@ struct tipc_net { ...@@ -94,7 +93,6 @@ struct tipc_net {
/* Broadcast link */ /* Broadcast link */
spinlock_t bclock; spinlock_t bclock;
struct tipc_bcbearer *bcbearer;
struct tipc_bc_base *bcbase; struct tipc_bc_base *bcbase;
struct tipc_link *bcl; struct tipc_link *bcl;
......
...@@ -244,7 +244,6 @@ static u32 link_own_addr(struct tipc_link *l) ...@@ -244,7 +244,6 @@ static u32 link_own_addr(struct tipc_link *l)
* @ownnode: identity of own node * @ownnode: identity of own node
* @peer: node id of peer node * @peer: node id of peer node
* @peer_caps: bitmap describing peer node capabilities * @peer_caps: bitmap describing peer node capabilities
* @maddr: media address to be used
* @bc_sndlink: the namespace global link used for broadcast sending * @bc_sndlink: the namespace global link used for broadcast sending
* @bc_rcvlink: the peer specific link used for broadcast reception * @bc_rcvlink: the peer specific link used for broadcast reception
* @inputq: queue to put messages ready for delivery * @inputq: queue to put messages ready for delivery
...@@ -257,7 +256,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, ...@@ -257,7 +256,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
int tolerance, char net_plane, u32 mtu, int priority, int tolerance, char net_plane, u32 mtu, int priority,
int window, u32 session, u32 ownnode, u32 peer, int window, u32 session, u32 ownnode, u32 peer,
u16 peer_caps, u16 peer_caps,
struct tipc_media_addr *maddr,
struct tipc_link *bc_sndlink, struct tipc_link *bc_sndlink,
struct tipc_link *bc_rcvlink, struct tipc_link *bc_rcvlink,
struct sk_buff_head *inputq, struct sk_buff_head *inputq,
...@@ -286,7 +284,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, ...@@ -286,7 +284,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
l->addr = peer; l->addr = peer;
l->peer_caps = peer_caps; l->peer_caps = peer_caps;
l->media_addr = maddr;
l->net = net; l->net = net;
l->peer_session = WILDCARD_SESSION; l->peer_session = WILDCARD_SESSION;
l->bearer_id = bearer_id; l->bearer_id = bearer_id;
...@@ -331,7 +328,7 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, ...@@ -331,7 +328,7 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
struct tipc_link *l; struct tipc_link *l;
if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
0, ownnode, peer, peer_caps, NULL, bc_sndlink, 0, ownnode, peer, peer_caps, bc_sndlink,
NULL, inputq, namedq, link)) NULL, inputq, namedq, link))
return false; return false;
...@@ -662,38 +659,6 @@ void link_prepare_wakeup(struct tipc_link *l) ...@@ -662,38 +659,6 @@ void link_prepare_wakeup(struct tipc_link *l)
} }
} }
/**
* tipc_link_reset_fragments - purge link's inbound message fragments queue
* @l_ptr: pointer to link
*/
void tipc_link_reset_fragments(struct tipc_link *l_ptr)
{
kfree_skb(l_ptr->reasm_buf);
l_ptr->reasm_buf = NULL;
}
void tipc_link_purge_backlog(struct tipc_link *l)
{
__skb_queue_purge(&l->backlogq);
l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
}
/**
* tipc_link_purge_queues - purge all pkt queues associated with link
* @l_ptr: pointer to link
*/
void tipc_link_purge_queues(struct tipc_link *l_ptr)
{
__skb_queue_purge(&l_ptr->deferdq);
__skb_queue_purge(&l_ptr->transmq);
tipc_link_purge_backlog(l_ptr);
tipc_link_reset_fragments(l_ptr);
}
void tipc_link_reset(struct tipc_link *l) void tipc_link_reset(struct tipc_link *l)
{ {
/* Link is down, accept any session */ /* Link is down, accept any session */
...@@ -705,12 +670,16 @@ void tipc_link_reset(struct tipc_link *l) ...@@ -705,12 +670,16 @@ void tipc_link_reset(struct tipc_link *l)
/* Prepare for renewed mtu size negotiation */ /* Prepare for renewed mtu size negotiation */
l->mtu = l->advertised_mtu; l->mtu = l->advertised_mtu;
/* Clean up all queues: */ /* Clean up all queues and counters: */
__skb_queue_purge(&l->transmq); __skb_queue_purge(&l->transmq);
__skb_queue_purge(&l->deferdq); __skb_queue_purge(&l->deferdq);
skb_queue_splice_init(&l->wakeupq, l->inputq); skb_queue_splice_init(&l->wakeupq, l->inputq);
__skb_queue_purge(&l->backlogq);
tipc_link_purge_backlog(l); l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
kfree_skb(l->reasm_buf); kfree_skb(l->reasm_buf);
kfree_skb(l->failover_reasm_skb); kfree_skb(l->failover_reasm_skb);
l->reasm_buf = NULL; l->reasm_buf = NULL;
...@@ -726,74 +695,6 @@ void tipc_link_reset(struct tipc_link *l) ...@@ -726,74 +695,6 @@ void tipc_link_reset(struct tipc_link *l)
link_reset_statistics(l); link_reset_statistics(l);
} }
/**
* __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
* @link: link to use
* @list: chain of buffers containing message
*
* Consumes the buffer chain, except when returning an error code,
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list)
{
struct tipc_msg *msg = buf_msg(skb_peek(list));
unsigned int maxwin = link->window;
unsigned int i, imp = msg_importance(msg);
uint mtu = link->mtu;
u16 ack = mod(link->rcv_nxt - 1);
u16 seqno = link->snd_nxt;
u16 bc_ack = link->bc_rcvlink->rcv_nxt - 1;
struct tipc_media_addr *addr = link->media_addr;
struct sk_buff_head *transmq = &link->transmq;
struct sk_buff_head *backlogq = &link->backlogq;
struct sk_buff *skb, *bskb;
/* Match msg importance against this and all higher backlog limits: */
for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
return link_schedule_user(link, list);
}
if (unlikely(msg_size(msg) > mtu))
return -EMSGSIZE;
/* Prepare each packet for sending, and add to relevant queue: */
while (skb_queue_len(list)) {
skb = skb_peek(list);
msg = buf_msg(skb);
msg_set_seqno(msg, seqno);
msg_set_ack(msg, ack);
msg_set_bcast_ack(msg, bc_ack);
if (likely(skb_queue_len(transmq) < maxwin)) {
__skb_dequeue(list);
__skb_queue_tail(transmq, skb);
tipc_bearer_send(net, link->bearer_id, skb, addr);
link->rcv_unacked = 0;
seqno++;
continue;
}
if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) {
kfree_skb(__skb_dequeue(list));
link->stats.sent_bundled++;
continue;
}
if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) {
kfree_skb(__skb_dequeue(list));
__skb_queue_tail(backlogq, bskb);
link->backlog[msg_importance(buf_msg(bskb))].len++;
link->stats.sent_bundled++;
link->stats.sent_bundles++;
continue;
}
link->backlog[imp].len += skb_queue_len(list);
skb_queue_splice_tail_init(list, backlogq);
}
link->snd_nxt = seqno;
return 0;
}
/** /**
* tipc_link_xmit(): enqueue buffer list according to queue situation * tipc_link_xmit(): enqueue buffer list according to queue situation
* @link: link to use * @link: link to use
...@@ -867,40 +768,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, ...@@ -867,40 +768,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
return 0; return 0;
} }
/*
* tipc_link_push_packets - push unsent packets to bearer
*
* Push out the unsent messages of a link where congestion
* has abated. Node is locked.
*
* Called with node locked
*/
void tipc_link_push_packets(struct tipc_link *link)
{
struct sk_buff *skb;
struct tipc_msg *msg;
u16 seqno = link->snd_nxt;
u16 ack = mod(link->rcv_nxt - 1);
while (skb_queue_len(&link->transmq) < link->window) {
skb = __skb_dequeue(&link->backlogq);
if (!skb)
break;
TIPC_SKB_CB(skb)->ackers = link->ackers;
msg = buf_msg(skb);
link->backlog[msg_importance(msg)].len--;
msg_set_ack(msg, ack);
msg_set_seqno(msg, seqno);
seqno = mod(seqno + 1);
/* msg_set_bcast_ack(msg, link->owner->bclink.last_in); */
link->rcv_unacked = 0;
__skb_queue_tail(&link->transmq, skb);
tipc_bearer_send(link->net, link->bearer_id,
skb, link->media_addr);
}
link->snd_nxt = seqno;
}
void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
{ {
struct sk_buff *skb, *_skb; struct sk_buff *skb, *_skb;
...@@ -943,40 +810,6 @@ static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb) ...@@ -943,40 +810,6 @@ static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr)); msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
} }
void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
u32 retransmits)
{
struct tipc_msg *msg;
if (!skb)
return;
msg = buf_msg(skb);
/* Detect repeated retransmit failures */
if (l_ptr->last_retransm == msg_seqno(msg)) {
if (++l_ptr->stale_count > 100) {
link_retransmit_failure(l_ptr, skb);
return;
}
} else {
l_ptr->last_retransm = msg_seqno(msg);
l_ptr->stale_count = 1;
}
skb_queue_walk_from(&l_ptr->transmq, skb) {
if (!retransmits)
break;
msg = buf_msg(skb);
msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));
/* msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); */
tipc_bearer_send(l_ptr->net, l_ptr->bearer_id, skb,
l_ptr->media_addr);
retransmits--;
l_ptr->stats.retransmitted++;
}
}
int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
struct sk_buff_head *xmitq) struct sk_buff_head *xmitq)
{ {
...@@ -1249,45 +1082,6 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, ...@@ -1249,45 +1082,6 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
return rc; return rc;
} }
/**
* tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
*
* Returns increase in queue length (i.e. 0 or 1)
*/
u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
{
struct sk_buff *skb1;
u16 seq_no = buf_seqno(skb);
/* Empty queue ? */
if (skb_queue_empty(list)) {
__skb_queue_tail(list, skb);
return 1;
}
/* Last ? */
if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
__skb_queue_tail(list, skb);
return 1;
}
/* Locate insertion point in queue, then insert; discard if duplicate */
skb_queue_walk(list, skb1) {
u16 curr_seqno = buf_seqno(skb1);
if (seq_no == curr_seqno) {
kfree_skb(skb);
return 0;
}
if (less(seq_no, curr_seqno))
break;
}
__skb_queue_before(list, skb1, skb);
return 1;
}
/* /*
* Send protocol message to the other endpoint. * Send protocol message to the other endpoint.
*/ */
......
...@@ -224,7 +224,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, ...@@ -224,7 +224,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
int tolerance, char net_plane, u32 mtu, int priority, int tolerance, char net_plane, u32 mtu, int priority,
int window, u32 session, u32 ownnode, u32 peer, int window, u32 session, u32 ownnode, u32 peer,
u16 peer_caps, u16 peer_caps,
struct tipc_media_addr *maddr,
struct tipc_link *bc_sndlink, struct tipc_link *bc_sndlink,
struct tipc_link *bc_rcvlink, struct tipc_link *bc_rcvlink,
struct sk_buff_head *inputq, struct sk_buff_head *inputq,
...@@ -249,22 +248,10 @@ bool tipc_link_is_synching(struct tipc_link *l); ...@@ -249,22 +248,10 @@ bool tipc_link_is_synching(struct tipc_link *l);
bool tipc_link_is_failingover(struct tipc_link *l); bool tipc_link_is_failingover(struct tipc_link *l);
bool tipc_link_is_blocked(struct tipc_link *l); bool tipc_link_is_blocked(struct tipc_link *l);
void tipc_link_set_active(struct tipc_link *l, bool active); void tipc_link_set_active(struct tipc_link *l, bool active);
void tipc_link_purge_queues(struct tipc_link *l_ptr);
void tipc_link_purge_backlog(struct tipc_link *l);
void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset(struct tipc_link *l_ptr);
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list);
int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
struct sk_buff_head *xmitq); struct sk_buff_head *xmitq);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, void tipc_link_set_queue_limits(struct tipc_link *l, u32 window);
u32 gap, u32 tolerance, u32 priority);
void tipc_link_push_packets(struct tipc_link *l_ptr);
u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf);
void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
void tipc_link_retransmit(struct tipc_link *l_ptr,
struct sk_buff *start, u32 retransmits);
struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
const struct sk_buff *skb);
int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb); int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb);
int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
......
...@@ -606,7 +606,7 @@ void tipc_node_check_dest(struct net *net, u32 onode, ...@@ -606,7 +606,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
b->net_plane, b->mtu, b->priority, b->net_plane, b->mtu, b->priority,
b->window, mod(tipc_net(net)->random), b->window, mod(tipc_net(net)->random),
tipc_own_addr(net), onode, tipc_own_addr(net), onode,
n->capabilities, &le->maddr, n->capabilities,
tipc_bc_sndlink(n->net), n->bc_entry.link, tipc_bc_sndlink(n->net), n->bc_entry.link,
&le->inputq, &le->inputq,
&n->bc_entry.namedq, &l)) { &n->bc_entry.namedq, &l)) {
...@@ -943,18 +943,13 @@ void tipc_node_unlock(struct tipc_node *node) ...@@ -943,18 +943,13 @@ void tipc_node_unlock(struct tipc_node *node)
publ_list = &node->publ_list; publ_list = &node->publ_list;
node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP | TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
TIPC_BCAST_RESET);
spin_unlock_bh(&node->lock); spin_unlock_bh(&node->lock);
if (flags & TIPC_NOTIFY_NODE_DOWN) if (flags & TIPC_NOTIFY_NODE_DOWN)
tipc_publ_notify(net, publ_list, addr); tipc_publ_notify(net, publ_list, addr);
if (flags & TIPC_WAKEUP_BCAST_USERS)
tipc_bclink_wakeup_users(net);
if (flags & TIPC_NOTIFY_NODE_UP) if (flags & TIPC_NOTIFY_NODE_UP)
tipc_named_node_up(net, addr); tipc_named_node_up(net, addr);
...@@ -966,11 +961,6 @@ void tipc_node_unlock(struct tipc_node *node) ...@@ -966,11 +961,6 @@ void tipc_node_unlock(struct tipc_node *node)
tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
link_id, addr); link_id, addr);
if (flags & TIPC_BCAST_MSG_EVT)
tipc_bclink_input(net);
if (flags & TIPC_BCAST_RESET)
tipc_node_reset_links(node);
} }
/* Caller should hold node lock for the passed node */ /* Caller should hold node lock for the passed node */
......
...@@ -55,11 +55,8 @@ ...@@ -55,11 +55,8 @@
enum { enum {
TIPC_NOTIFY_NODE_DOWN = (1 << 3), TIPC_NOTIFY_NODE_DOWN = (1 << 3),
TIPC_NOTIFY_NODE_UP = (1 << 4), TIPC_NOTIFY_NODE_UP = (1 << 4),
TIPC_WAKEUP_BCAST_USERS = (1 << 5),
TIPC_NOTIFY_LINK_UP = (1 << 6), TIPC_NOTIFY_LINK_UP = (1 << 6),
TIPC_NOTIFY_LINK_DOWN = (1 << 7), TIPC_NOTIFY_LINK_DOWN = (1 << 7)
TIPC_BCAST_MSG_EVT = (1 << 9),
TIPC_BCAST_RESET = (1 << 10)
}; };
/* Optional capabilities supported by this code version /* Optional capabilities supported by this code version
...@@ -70,29 +67,6 @@ enum { ...@@ -70,29 +67,6 @@ enum {
#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH #define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
/**
* struct tipc_node_bclink - TIPC node bclink structure
* @acked: sequence # of last outbound b'cast message acknowledged by node
* @last_in: sequence # of last in-sequence b'cast message received from node
* @last_sent: sequence # of last b'cast message sent by node
* @oos_state: state tracker for handling OOS b'cast messages
* @deferred_queue: deferred queue saved OOS b'cast message received from node
* @reasm_buf: broadcast reassembly queue head from node
* @inputq_map: bitmap indicating which inqueues should be kicked
* @recv_permitted: true if node is allowed to receive b'cast messages
*/
struct tipc_node_bclink {
u32 acked;
u32 last_in;
u32 last_sent;
u32 oos_state;
u32 deferred_size;
struct sk_buff_head deferdq;
struct sk_buff *reasm_buf;
struct sk_buff_head namedq;
bool recv_permitted;
};
struct tipc_link_entry { struct tipc_link_entry {
struct tipc_link *link; struct tipc_link *link;
u32 mtu; u32 mtu;
...@@ -120,7 +94,6 @@ struct tipc_bclink_entry { ...@@ -120,7 +94,6 @@ struct tipc_bclink_entry {
* @active_links: bearer ids of active links, used as index into links[] array * @active_links: bearer ids of active links, used as index into links[] array
* @links: array containing references to all links to node * @links: array containing references to all links to node
* @action_flags: bit mask of different types of node actions * @action_flags: bit mask of different types of node actions
* @bclink: broadcast-related info
* @state: connectivity state vs peer node * @state: connectivity state vs peer node
* @sync_point: sequence number where synch/failover is finished * @sync_point: sequence number where synch/failover is finished
* @list: links to adjacent nodes in sorted list of cluster's nodes * @list: links to adjacent nodes in sorted list of cluster's nodes
...@@ -142,7 +115,6 @@ struct tipc_node { ...@@ -142,7 +115,6 @@ struct tipc_node {
struct tipc_link_entry links[MAX_BEARERS]; struct tipc_link_entry links[MAX_BEARERS];
struct tipc_bclink_entry bc_entry; struct tipc_bclink_entry bc_entry;
int action_flags; int action_flags;
struct tipc_node_bclink bclink;
struct list_head list; struct list_head list;
int state; int state;
u16 sync_point; u16 sync_point;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment