Commit dbdf6d24 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: make name table distributor use new send function

In a previous commit series ("tipc: new unicast transmission code")
we introduced a new message sending function, tipc_link_xmit2(),
and moved the unicast data users over to use that function. We now
let the internal name table distributor do the same.

The interaction between the name distributor and the node/link
layer also becomes significantly simpler, so we can eliminate
the function tipc_link_names_xmit().
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Reviewed-by: default avatarErik Hugne <erik.hugne@ericsson.com>
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent a9f559c3
...@@ -1032,47 +1032,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) ...@@ -1032,47 +1032,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
kfree_skb(buf); kfree_skb(buf);
} }
/*
* tipc_link_names_xmit - send name table entries to new neighbor
*
* Send routine for bulk delivery of name table messages when contact
* with a new neighbor occurs. No link congestion checking is performed
* because name table messages *must* be delivered. The messages must be
* small enough not to require fragmentation.
* Called without any locks held.
*/
void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
{
struct tipc_node *n_ptr;
struct tipc_link *l_ptr;
struct sk_buff *buf;
struct sk_buff *temp_buf;
if (list_empty(message_list))
return;
n_ptr = tipc_node_find(dest);
if (n_ptr) {
tipc_node_lock(n_ptr);
l_ptr = n_ptr->active_links[0];
if (l_ptr) {
/* convert circular list to linear list */
((struct sk_buff *)message_list->prev)->next = NULL;
link_add_chain_to_outqueue(l_ptr,
(struct sk_buff *)message_list->next, 0);
tipc_link_push_queue(l_ptr);
INIT_LIST_HEAD(message_list);
}
tipc_node_unlock(n_ptr);
}
/* discard the messages if they couldn't be sent */
list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
list_del((struct list_head *)buf);
kfree_skb(buf);
}
}
/* /*
* tipc_link_push_packet: Push one unsent packet to the media * tipc_link_push_packet: Push one unsent packet to the media
*/ */
......
...@@ -228,7 +228,6 @@ void tipc_link_reset(struct tipc_link *l_ptr); ...@@ -228,7 +228,6 @@ void tipc_link_reset(struct tipc_link *l_ptr);
void tipc_link_reset_list(unsigned int bearer_id); void tipc_link_reset_list(unsigned int bearer_id);
int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector); int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf); int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
......
...@@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest) ...@@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
void named_cluster_distribute(struct sk_buff *buf) void named_cluster_distribute(struct sk_buff *buf)
{ {
struct sk_buff *buf_copy; struct sk_buff *obuf;
struct tipc_node *n_ptr; struct tipc_node *node;
struct tipc_link *l_ptr; u32 dnode;
rcu_read_lock(); rcu_read_lock();
list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { list_for_each_entry_rcu(node, &tipc_node_list, list) {
tipc_node_lock(n_ptr); dnode = node->addr;
l_ptr = n_ptr->active_links[n_ptr->addr & 1]; if (in_own_node(dnode))
if (l_ptr) { continue;
buf_copy = skb_copy(buf, GFP_ATOMIC); if (!tipc_node_active_links(node))
if (!buf_copy) { continue;
tipc_node_unlock(n_ptr); obuf = skb_copy(buf, GFP_ATOMIC);
break; if (!obuf)
} break;
msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); msg_set_destnode(buf_msg(obuf), dnode);
__tipc_link_xmit(l_ptr, buf_copy); tipc_link_xmit2(obuf, dnode, dnode);
}
tipc_node_unlock(n_ptr);
} }
rcu_read_unlock(); rcu_read_unlock();
...@@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ) ...@@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
return buf; return buf;
} }
/* /**
* named_distribute - prepare name info for bulk distribution to another node * named_distribute - prepare name info for bulk distribution to another node
* @msg_list: list of messages (buffers) to be returned from this function
* @dnode: node to be updated
* @pls: linked list of publication items to be packed into buffer chain
*/ */
static void named_distribute(struct list_head *message_list, u32 node, static void named_distribute(struct list_head *msg_list, u32 dnode,
struct publ_list *pls, u32 max_item_buf) struct publ_list *pls)
{ {
struct publication *publ; struct publication *publ;
struct sk_buff *buf = NULL; struct sk_buff *buf = NULL;
struct distr_item *item = NULL; struct distr_item *item = NULL;
u32 left = 0; uint dsz = pls->size * ITEM_SIZE;
u32 rest = pls->size * ITEM_SIZE; uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
uint rem = dsz;
uint msg_rem = 0;
list_for_each_entry(publ, &pls->list, local_list) { list_for_each_entry(publ, &pls->list, local_list) {
/* Prepare next buffer: */
if (!buf) { if (!buf) {
left = (rest <= max_item_buf) ? rest : max_item_buf; msg_rem = min_t(uint, rem, msg_dsz);
rest -= left; rem -= msg_rem;
buf = named_prepare_buf(PUBLICATION, left, node); buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
if (!buf) { if (!buf) {
pr_warn("Bulk publication failure\n"); pr_warn("Bulk publication failure\n");
return; return;
} }
item = (struct distr_item *)msg_data(buf_msg(buf)); item = (struct distr_item *)msg_data(buf_msg(buf));
} }
/* Pack publication into message: */
publ_to_item(item, publ); publ_to_item(item, publ);
item++; item++;
left -= ITEM_SIZE; msg_rem -= ITEM_SIZE;
if (!left) {
list_add_tail((struct list_head *)buf, message_list); /* Append full buffer to list: */
if (!msg_rem) {
list_add_tail((struct list_head *)buf, msg_list);
buf = NULL; buf = NULL;
} }
} }
...@@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node, ...@@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
/** /**
* tipc_named_node_up - tell specified node about all publications by this node * tipc_named_node_up - tell specified node about all publications by this node
*/ */
void tipc_named_node_up(u32 max_item_buf, u32 node) void tipc_named_node_up(u32 dnode)
{ {
LIST_HEAD(message_list); LIST_HEAD(msg_list);
struct sk_buff *buf_chain;
read_lock_bh(&tipc_nametbl_lock); read_lock_bh(&tipc_nametbl_lock);
named_distribute(&message_list, node, &publ_cluster, max_item_buf); named_distribute(&msg_list, dnode, &publ_cluster);
named_distribute(&message_list, node, &publ_zone, max_item_buf); named_distribute(&msg_list, dnode, &publ_zone);
read_unlock_bh(&tipc_nametbl_lock); read_unlock_bh(&tipc_nametbl_lock);
tipc_link_names_xmit(&message_list, node); /* Convert circular list to linear list and send: */
buf_chain = (struct sk_buff *)msg_list.next;
((struct sk_buff *)msg_list.prev)->next = NULL;
tipc_link_xmit2(buf_chain, dnode, dnode);
} }
/** /**
......
...@@ -70,7 +70,7 @@ struct distr_item { ...@@ -70,7 +70,7 @@ struct distr_item {
struct sk_buff *tipc_named_publish(struct publication *publ); struct sk_buff *tipc_named_publish(struct publication *publ);
struct sk_buff *tipc_named_withdraw(struct publication *publ); struct sk_buff *tipc_named_withdraw(struct publication *publ);
void named_cluster_distribute(struct sk_buff *buf); void named_cluster_distribute(struct sk_buff *buf);
void tipc_named_node_up(u32 max_item_buf, u32 node); void tipc_named_node_up(u32 dnode);
void tipc_named_rcv(struct sk_buff *buf); void tipc_named_rcv(struct sk_buff *buf);
void tipc_named_reinit(void); void tipc_named_reinit(void);
......
...@@ -474,8 +474,6 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len) ...@@ -474,8 +474,6 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
void tipc_node_unlock(struct tipc_node *node) void tipc_node_unlock(struct tipc_node *node)
{ {
LIST_HEAD(nsub_list); LIST_HEAD(nsub_list);
struct tipc_link *link;
int pkt_sz = 0;
u32 addr = 0; u32 addr = 0;
if (likely(!node->action_flags)) { if (likely(!node->action_flags)) {
...@@ -488,18 +486,13 @@ void tipc_node_unlock(struct tipc_node *node) ...@@ -488,18 +486,13 @@ void tipc_node_unlock(struct tipc_node *node)
node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
} }
if (node->action_flags & TIPC_NOTIFY_NODE_UP) { if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
link = node->active_links[0];
node->action_flags &= ~TIPC_NOTIFY_NODE_UP; node->action_flags &= ~TIPC_NOTIFY_NODE_UP;
if (link) { addr = node->addr;
pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
ITEM_SIZE;
addr = node->addr;
}
} }
spin_unlock_bh(&node->lock); spin_unlock_bh(&node->lock);
if (!list_empty(&nsub_list)) if (!list_empty(&nsub_list))
tipc_nodesub_notify(&nsub_list); tipc_nodesub_notify(&nsub_list);
if (pkt_sz) if (addr)
tipc_named_node_up(pkt_sz, addr); tipc_named_node_up(addr);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment