Commit eab8c045 authored by Ying Xue's avatar Ying Xue Committed by David S. Miller

tipc: move the delivery of named messages out of nametbl lock

Commit a89778d8 ("tipc: add support
for link state subscriptions") introduced below possible deadlock
scenario:

       CPU0                          CPU1
T0:   tipc_publish()                 link_timeout()
T1:   tipc_nametbl_publish()         [grab node lock]*
T2:   [grab nametbl write lock]*     link_state_event()
T3:   named_cluster_distribute()     link_activate()
T4:   [grab node lock]*              tipc_node_link_up()
T5:                                  tipc_nametbl_publish()
T6:                                  [grab nametble write lock]*

The opposite order of holding nametbl write lock and node lock on
above two different paths may result in a deadlock. If we move the
the delivery of named messages via link out of name nametbl lock,
the reverse order of holding locks will be eliminated, as a result,
the deadlock will be killed as well.
Signed-off-by: default avatarYing Xue <ying.xue@windriver.com>
Reviewed-by: default avatarErik Hugne <erik.hugne@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent d7bb74c3
...@@ -127,7 +127,7 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest) ...@@ -127,7 +127,7 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
return buf; return buf;
} }
static void named_cluster_distribute(struct sk_buff *buf) void named_cluster_distribute(struct sk_buff *buf)
{ {
struct sk_buff *buf_copy; struct sk_buff *buf_copy;
struct tipc_node *n_ptr; struct tipc_node *n_ptr;
...@@ -156,7 +156,7 @@ static void named_cluster_distribute(struct sk_buff *buf) ...@@ -156,7 +156,7 @@ static void named_cluster_distribute(struct sk_buff *buf)
/** /**
* tipc_named_publish - tell other nodes about a new publication by this node * tipc_named_publish - tell other nodes about a new publication by this node
*/ */
void tipc_named_publish(struct publication *publ) struct sk_buff *tipc_named_publish(struct publication *publ)
{ {
struct sk_buff *buf; struct sk_buff *buf;
struct distr_item *item; struct distr_item *item;
...@@ -165,23 +165,23 @@ void tipc_named_publish(struct publication *publ) ...@@ -165,23 +165,23 @@ void tipc_named_publish(struct publication *publ)
publ_lists[publ->scope]->size++; publ_lists[publ->scope]->size++;
if (publ->scope == TIPC_NODE_SCOPE) if (publ->scope == TIPC_NODE_SCOPE)
return; return NULL;
buf = named_prepare_buf(PUBLICATION, ITEM_SIZE, 0); buf = named_prepare_buf(PUBLICATION, ITEM_SIZE, 0);
if (!buf) { if (!buf) {
pr_warn("Publication distribution failure\n"); pr_warn("Publication distribution failure\n");
return; return NULL;
} }
item = (struct distr_item *)msg_data(buf_msg(buf)); item = (struct distr_item *)msg_data(buf_msg(buf));
publ_to_item(item, publ); publ_to_item(item, publ);
named_cluster_distribute(buf); return buf;
} }
/** /**
* tipc_named_withdraw - tell other nodes about a withdrawn publication by this node * tipc_named_withdraw - tell other nodes about a withdrawn publication by this node
*/ */
void tipc_named_withdraw(struct publication *publ) struct sk_buff *tipc_named_withdraw(struct publication *publ)
{ {
struct sk_buff *buf; struct sk_buff *buf;
struct distr_item *item; struct distr_item *item;
...@@ -190,17 +190,17 @@ void tipc_named_withdraw(struct publication *publ) ...@@ -190,17 +190,17 @@ void tipc_named_withdraw(struct publication *publ)
publ_lists[publ->scope]->size--; publ_lists[publ->scope]->size--;
if (publ->scope == TIPC_NODE_SCOPE) if (publ->scope == TIPC_NODE_SCOPE)
return; return NULL;
buf = named_prepare_buf(WITHDRAWAL, ITEM_SIZE, 0); buf = named_prepare_buf(WITHDRAWAL, ITEM_SIZE, 0);
if (!buf) { if (!buf) {
pr_warn("Withdrawal distribution failure\n"); pr_warn("Withdrawal distribution failure\n");
return; return NULL;
} }
item = (struct distr_item *)msg_data(buf_msg(buf)); item = (struct distr_item *)msg_data(buf_msg(buf));
publ_to_item(item, publ); publ_to_item(item, publ);
named_cluster_distribute(buf); return buf;
} }
/* /*
......
...@@ -39,8 +39,9 @@ ...@@ -39,8 +39,9 @@
#include "name_table.h" #include "name_table.h"
void tipc_named_publish(struct publication *publ); struct sk_buff *tipc_named_publish(struct publication *publ);
void tipc_named_withdraw(struct publication *publ); struct sk_buff *tipc_named_withdraw(struct publication *publ);
void named_cluster_distribute(struct sk_buff *buf);
void tipc_named_node_up(unsigned long node); void tipc_named_node_up(unsigned long node);
void tipc_named_rcv(struct sk_buff *buf); void tipc_named_rcv(struct sk_buff *buf);
void tipc_named_reinit(void); void tipc_named_reinit(void);
......
...@@ -664,6 +664,7 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, ...@@ -664,6 +664,7 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
u32 scope, u32 port_ref, u32 key) u32 scope, u32 port_ref, u32 key)
{ {
struct publication *publ; struct publication *publ;
struct sk_buff *buf = NULL;
if (table.local_publ_count >= TIPC_MAX_PUBLICATIONS) { if (table.local_publ_count >= TIPC_MAX_PUBLICATIONS) {
pr_warn("Publication failed, local publication limit reached (%u)\n", pr_warn("Publication failed, local publication limit reached (%u)\n",
...@@ -676,9 +677,12 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, ...@@ -676,9 +677,12 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
tipc_own_addr, port_ref, key); tipc_own_addr, port_ref, key);
if (likely(publ)) { if (likely(publ)) {
table.local_publ_count++; table.local_publ_count++;
tipc_named_publish(publ); buf = tipc_named_publish(publ);
} }
write_unlock_bh(&tipc_nametbl_lock); write_unlock_bh(&tipc_nametbl_lock);
if (buf)
named_cluster_distribute(buf);
return publ; return publ;
} }
...@@ -688,15 +692,19 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper, ...@@ -688,15 +692,19 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key) int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
{ {
struct publication *publ; struct publication *publ;
struct sk_buff *buf;
write_lock_bh(&tipc_nametbl_lock); write_lock_bh(&tipc_nametbl_lock);
publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key); publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
if (likely(publ)) { if (likely(publ)) {
table.local_publ_count--; table.local_publ_count--;
tipc_named_withdraw(publ); buf = tipc_named_withdraw(publ);
write_unlock_bh(&tipc_nametbl_lock); write_unlock_bh(&tipc_nametbl_lock);
list_del_init(&publ->pport_list); list_del_init(&publ->pport_list);
kfree(publ); kfree(publ);
if (buf)
named_cluster_distribute(buf);
return 1; return 1;
} }
write_unlock_bh(&tipc_nametbl_lock); write_unlock_bh(&tipc_nametbl_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment