Commit 218527fe authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: replace name table service range array with rb tree

The current design of the binding table has an unnecessary memory
consuming and complex data structure. It aggregates the service range
items into an array, which is expanded by a factor two every time it
becomes too small to hold a new item. Furthermore, the arrays never
shrink when the number of ranges diminishes.

We now replace this array with an RB tree that is holding the range
items as tree nodes, each range directly holding a list of bindings.

This, along with a few name changes, improves both readability and
volume of the code, as well as reducing memory consumption and hopefully
improving cache hit rate.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 24197ee2
...@@ -58,6 +58,7 @@ ...@@ -58,6 +58,7 @@
#include <linux/etherdevice.h> #include <linux/etherdevice.h>
#include <net/netns/generic.h> #include <net/netns/generic.h>
#include <linux/rhashtable.h> #include <linux/rhashtable.h>
#include <net/genetlink.h>
struct tipc_node; struct tipc_node;
struct tipc_bearer; struct tipc_bearer;
......
...@@ -1810,7 +1810,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, ...@@ -1810,7 +1810,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
{ {
int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); int max_bulk = TIPC_MAX_PUBL / (l->mtu / ITEM_SIZE);
l->window = win; l->window = win;
l->backlog[TIPC_LOW_IMPORTANCE].limit = max_t(u16, 50, win); l->backlog[TIPC_LOW_IMPORTANCE].limit = max_t(u16, 50, win);
......
...@@ -44,52 +44,40 @@ ...@@ -44,52 +44,40 @@
#include "addr.h" #include "addr.h"
#include "node.h" #include "node.h"
#include "group.h" #include "group.h"
#include <net/genetlink.h>
#define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */
/** /**
* struct name_info - name sequence publication info * struct service_range - container for all bindings of a service range
* @node_list: list of publications on own node of this <type,lower,upper> * @lower: service range lower bound
* @all_publ: list of all publications of this <type,lower,upper> * @upper: service range upper bound
* @tree_node: member of service range RB tree
* @local_publ: list of identical publications made from this node
* Used by closest_first lookup and multicast lookup algorithm
* @all_publ: all publications identical to this one, whatever node and scope
* Used by round-robin lookup algorithm
*/ */
struct name_info { struct service_range {
struct list_head local_publ;
struct list_head all_publ;
};
/**
* struct sub_seq - container for all published instances of a name sequence
* @lower: name sequence lower bound
* @upper: name sequence upper bound
* @info: pointer to name sequence publication info
*/
struct sub_seq {
u32 lower; u32 lower;
u32 upper; u32 upper;
struct name_info *info; struct rb_node tree_node;
struct list_head local_publ;
struct list_head all_publ;
}; };
/** /**
* struct name_seq - container for all published instances of a name type * struct tipc_service - container for all published instances of a service type
* @type: 32 bit 'type' value for name sequence * @type: 32 bit 'type' value for service
* @sseq: pointer to dynamically-sized array of sub-sequences of this 'type'; * @ranges: rb tree containing all service ranges for this service
* sub-sequences are sorted in ascending order * @service_list: links to adjacent name ranges in hash chain
* @alloc: number of sub-sequences currently in array * @subscriptions: list of subscriptions for this service type
* @first_free: array index of first unused sub-sequence entry * @lock: spinlock controlling access to pertaining service ranges/publications
* @ns_list: links to adjacent name sequences in hash chain
* @subscriptions: list of subscriptions for this 'type'
* @lock: spinlock controlling access to publication lists of all sub-sequences
* @rcu: RCU callback head used for deferred freeing * @rcu: RCU callback head used for deferred freeing
*/ */
struct name_seq { struct tipc_service {
u32 type; u32 type;
struct sub_seq *sseqs; struct rb_root ranges;
u32 alloc; struct hlist_node service_list;
u32 first_free;
struct hlist_node ns_list;
struct list_head subscriptions; struct list_head subscriptions;
spinlock_t lock; spinlock_t lock; /* Covers service range list */
struct rcu_head rcu; struct rcu_head rcu;
}; };
...@@ -99,17 +87,16 @@ static int hash(int x) ...@@ -99,17 +87,16 @@ static int hash(int x)
} }
/** /**
* publ_create - create a publication structure * tipc_publ_create - create a publication structure
*/ */
static struct publication *publ_create(u32 type, u32 lower, u32 upper, static struct publication *tipc_publ_create(u32 type, u32 lower, u32 upper,
u32 scope, u32 node, u32 port, u32 scope, u32 node, u32 port,
u32 key) u32 key)
{ {
struct publication *publ = kzalloc(sizeof(*publ), GFP_ATOMIC); struct publication *publ = kzalloc(sizeof(*publ), GFP_ATOMIC);
if (publ == NULL) {
pr_warn("Publication creation failure, no memory\n"); if (!publ)
return NULL; return NULL;
}
publ->type = type; publ->type = type;
publ->lower = lower; publ->lower = lower;
...@@ -119,372 +106,298 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper, ...@@ -119,372 +106,298 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper,
publ->port = port; publ->port = port;
publ->key = key; publ->key = key;
INIT_LIST_HEAD(&publ->binding_sock); INIT_LIST_HEAD(&publ->binding_sock);
INIT_LIST_HEAD(&publ->binding_node);
INIT_LIST_HEAD(&publ->local_publ);
INIT_LIST_HEAD(&publ->all_publ);
return publ; return publ;
} }
/** /**
* tipc_subseq_alloc - allocate a specified number of sub-sequence structures * tipc_service_create - create a service structure for the specified 'type'
*/
static struct sub_seq *tipc_subseq_alloc(u32 cnt)
{
return kcalloc(cnt, sizeof(struct sub_seq), GFP_ATOMIC);
}
/**
* tipc_nameseq_create - create a name sequence structure for the specified 'type'
* *
* Allocates a single sub-sequence structure and sets it to all 0's. * Allocates a single range structure and sets it to all 0's.
*/ */
static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_head) static struct tipc_service *tipc_service_create(u32 type, struct hlist_head *hd)
{ {
struct name_seq *nseq = kzalloc(sizeof(*nseq), GFP_ATOMIC); struct tipc_service *service = kzalloc(sizeof(*service), GFP_ATOMIC);
struct sub_seq *sseq = tipc_subseq_alloc(1);
if (!nseq || !sseq) { if (!service) {
pr_warn("Name sequence creation failed, no memory\n"); pr_warn("Service creation failed, no memory\n");
kfree(nseq);
kfree(sseq);
return NULL; return NULL;
} }
spin_lock_init(&nseq->lock); spin_lock_init(&service->lock);
nseq->type = type; service->type = type;
nseq->sseqs = sseq; service->ranges = RB_ROOT;
nseq->alloc = 1; INIT_HLIST_NODE(&service->service_list);
INIT_HLIST_NODE(&nseq->ns_list); INIT_LIST_HEAD(&service->subscriptions);
INIT_LIST_HEAD(&nseq->subscriptions); hlist_add_head_rcu(&service->service_list, hd);
hlist_add_head_rcu(&nseq->ns_list, seq_head); return service;
return nseq;
} }
/** /**
* nameseq_find_subseq - find sub-sequence (if any) matching a name instance * tipc_service_find_range - find service range matching a service instance
* *
* Very time-critical, so binary searches through sub-sequence array. * Very time-critical, so binary search through range rb tree
*/ */
static struct sub_seq *nameseq_find_subseq(struct name_seq *nseq, static struct service_range *tipc_service_find_range(struct tipc_service *sc,
u32 instance) u32 instance)
{ {
struct sub_seq *sseqs = nseq->sseqs; struct rb_node *n = sc->ranges.rb_node;
int low = 0; struct service_range *sr;
int high = nseq->first_free - 1;
int mid; while (n) {
sr = container_of(n, struct service_range, tree_node);
while (low <= high) { if (sr->lower > instance)
mid = (low + high) / 2; n = n->rb_left;
if (instance < sseqs[mid].lower) else if (sr->upper < instance)
high = mid - 1; n = n->rb_right;
else if (instance > sseqs[mid].upper)
low = mid + 1;
else else
return &sseqs[mid]; return sr;
} }
return NULL; return NULL;
} }
/** static struct service_range *tipc_service_create_range(struct tipc_service *sc,
* nameseq_locate_subseq - determine position of name instance in sub-sequence u32 lower, u32 upper)
*
* Returns index in sub-sequence array of the entry that contains the specified
* instance value; if no entry contains that value, returns the position
* where a new entry for it would be inserted in the array.
*
* Note: Similar to binary search code for locating a sub-sequence.
*/
static u32 nameseq_locate_subseq(struct name_seq *nseq, u32 instance)
{ {
struct sub_seq *sseqs = nseq->sseqs; struct rb_node **n, *parent = NULL;
int low = 0; struct service_range *sr, *tmp;
int high = nseq->first_free - 1;
int mid; n = &sc->ranges.rb_node;
while (*n) {
while (low <= high) { tmp = container_of(*n, struct service_range, tree_node);
mid = (low + high) / 2; parent = *n;
if (instance < sseqs[mid].lower) tmp = container_of(parent, struct service_range, tree_node);
high = mid - 1; if (lower < tmp->lower)
else if (instance > sseqs[mid].upper) n = &(*n)->rb_left;
low = mid + 1; else if (upper > tmp->upper)
n = &(*n)->rb_right;
else else
return mid; return NULL;
} }
return low; sr = kzalloc(sizeof(*sr), GFP_ATOMIC);
if (!sr)
return NULL;
sr->lower = lower;
sr->upper = upper;
INIT_LIST_HEAD(&sr->local_publ);
INIT_LIST_HEAD(&sr->all_publ);
rb_link_node(&sr->tree_node, parent, n);
rb_insert_color(&sr->tree_node, &sc->ranges);
return sr;
} }
/** static struct publication *tipc_service_insert_publ(struct net *net,
* tipc_nameseq_insert_publ struct tipc_service *sc,
*/
static struct publication *tipc_nameseq_insert_publ(struct net *net,
struct name_seq *nseq,
u32 type, u32 lower, u32 type, u32 lower,
u32 upper, u32 scope, u32 upper, u32 scope,
u32 node, u32 port, u32 key) u32 node, u32 port,
u32 key)
{ {
struct tipc_subscription *s; struct tipc_subscription *sub, *tmp;
struct tipc_subscription *st; struct service_range *sr;
struct publication *publ; struct publication *p;
struct sub_seq *sseq; bool first = false;
struct name_info *info;
int created_subseq = 0; sr = tipc_service_find_range(sc, lower);
if (!sr) {
sseq = nameseq_find_subseq(nseq, lower); sr = tipc_service_create_range(sc, lower, upper);
if (sseq) { if (!sr)
goto err;
/* Lower end overlaps existing entry => need an exact match */ first = true;
if ((sseq->lower != lower) || (sseq->upper != upper)) {
return NULL;
}
info = sseq->info;
/* Check if an identical publication already exists */
list_for_each_entry(publ, &info->all_publ, all_publ) {
if (publ->port == port && publ->key == key &&
(!publ->node || publ->node == node))
return NULL;
}
} else {
u32 inspos;
struct sub_seq *freesseq;
/* Find where lower end should be inserted */
inspos = nameseq_locate_subseq(nseq, lower);
/* Fail if upper end overlaps into an existing entry */
if ((inspos < nseq->first_free) &&
(upper >= nseq->sseqs[inspos].lower)) {
return NULL;
}
/* Ensure there is space for new sub-sequence */
if (nseq->first_free == nseq->alloc) {
struct sub_seq *sseqs = tipc_subseq_alloc(nseq->alloc * 2);
if (!sseqs) {
pr_warn("Cannot publish {%u,%u,%u}, no memory\n",
type, lower, upper);
return NULL;
}
memcpy(sseqs, nseq->sseqs,
nseq->alloc * sizeof(struct sub_seq));
kfree(nseq->sseqs);
nseq->sseqs = sseqs;
nseq->alloc *= 2;
}
info = kzalloc(sizeof(*info), GFP_ATOMIC);
if (!info) {
pr_warn("Cannot publish {%u,%u,%u}, no memory\n",
type, lower, upper);
return NULL;
}
INIT_LIST_HEAD(&info->local_publ);
INIT_LIST_HEAD(&info->all_publ);
/* Insert new sub-sequence */
sseq = &nseq->sseqs[inspos];
freesseq = &nseq->sseqs[nseq->first_free];
memmove(sseq + 1, sseq, (freesseq - sseq) * sizeof(*sseq));
memset(sseq, 0, sizeof(*sseq));
nseq->first_free++;
sseq->lower = lower;
sseq->upper = upper;
sseq->info = info;
created_subseq = 1;
} }
/* Insert a publication */ /* Lower end overlaps existing entry, but we need an exact match */
publ = publ_create(type, lower, upper, scope, node, port, key); if (sr->lower != lower || sr->upper != upper)
if (!publ)
return NULL; return NULL;
list_add(&publ->all_publ, &info->all_publ); /* Return if the publication already exists */
list_for_each_entry(p, &sr->all_publ, all_publ) {
if (p->key == key && (!p->node || p->node == node))
return NULL;
}
/* Create and insert publication */
p = tipc_publ_create(type, lower, upper, scope, node, port, key);
if (!p)
goto err;
if (in_own_node(net, node)) if (in_own_node(net, node))
list_add(&publ->local_publ, &info->local_publ); list_add(&p->local_publ, &sr->local_publ);
list_add(&p->all_publ, &sr->all_publ);
/* Any subscriptions waiting for notification? */ /* Any subscriptions waiting for notification? */
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
tipc_sub_report_overlap(s, publ->lower, publ->upper, tipc_sub_report_overlap(sub, p->lower, p->upper, TIPC_PUBLISHED,
TIPC_PUBLISHED, publ->port, p->port, p->node, p->scope, first);
publ->node, publ->scope,
created_subseq);
} }
return publ; return p;
err:
pr_warn("Failed to bind to %u,%u,%u, no memory\n", type, lower, upper);
return NULL;
} }
/** /**
* tipc_nameseq_remove_publ * tipc_service_remove_publ - remove a publication from a service
* *
* NOTE: There may be cases where TIPC is asked to remove a publication * NOTE: There may be cases where TIPC is asked to remove a publication
* that is not in the name table. For example, if another node issues a * that is not in the name table. For example, if another node issues a
* publication for a name sequence that overlaps an existing name sequence * publication for a name range that overlaps an existing name range
* the publication will not be recorded, which means the publication won't * the publication will not be recorded, which means the publication won't
* be found when the name sequence is later withdrawn by that node. * be found when the name range is later withdrawn by that node.
* A failed withdraw request simply returns a failure indication and lets the * A failed withdraw request simply returns a failure indication and lets the
* caller issue any error or warning messages associated with such a problem. * caller issue any error or warning messages associated with such a problem.
*/ */
static struct publication *tipc_nameseq_remove_publ(struct net *net, static struct publication *tipc_service_remove_publ(struct net *net,
struct name_seq *nseq, struct tipc_service *sc,
u32 inst, u32 node, u32 inst, u32 node,
u32 port, u32 key) u32 port, u32 key)
{ {
struct publication *publ; struct tipc_subscription *sub, *tmp;
struct sub_seq *sseq = nameseq_find_subseq(nseq, inst); struct service_range *sr;
struct name_info *info; struct publication *p;
struct sub_seq *free; bool found = false;
struct tipc_subscription *s, *st; bool last = false;
int removed_subseq = 0;
if (!sseq)
return NULL;
info = sseq->info; sr = tipc_service_find_range(sc, inst);
if (!sr)
return NULL;
/* Locate publication, if it exists */ /* Find publication, if it exists */
list_for_each_entry(publ, &info->all_publ, all_publ) { list_for_each_entry(p, &sr->all_publ, all_publ) {
if (publ->key == key && publ->port == port && if (p->key != key || (node && node != p->node))
(!publ->node || publ->node == node)) continue;
goto found; found = true;
break;
} }
return NULL; if (!found)
return NULL;
found: list_del(&p->all_publ);
list_del(&publ->all_publ); list_del(&p->local_publ);
if (in_own_node(net, node))
list_del(&publ->local_publ); /* Remove service range item if this was its last publication */
if (list_empty(&sr->all_publ)) {
/* Contract subseq list if no more publications for that subseq */ last = true;
if (list_empty(&info->all_publ)) { rb_erase(&sr->tree_node, &sc->ranges);
kfree(info); kfree(sr);
free = &nseq->sseqs[nseq->first_free--];
memmove(sseq, sseq + 1, (free - (sseq + 1)) * sizeof(*sseq));
removed_subseq = 1;
} }
/* Notify any waiting subscriptions */ /* Notify any waiting subscriptions */
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
tipc_sub_report_overlap(s, publ->lower, publ->upper, tipc_sub_report_overlap(sub, p->lower, p->upper, TIPC_WITHDRAWN,
TIPC_WITHDRAWN, publ->port, p->port, p->node, p->scope, last);
publ->node, publ->scope,
removed_subseq);
} }
return p;
return publ;
} }
/** /**
* tipc_nameseq_subscribe - attach a subscription, and optionally * tipc_service_subscribe - attach a subscription, and optionally
* issue the prescribed number of events if there is any sub- * issue the prescribed number of events if there is any service
* sequence overlapping with the requested sequence * range overlapping with the requested range
*/ */
static void tipc_nameseq_subscribe(struct name_seq *nseq, static void tipc_service_subscribe(struct tipc_service *service,
struct tipc_subscription *sub) struct tipc_subscription *sub)
{ {
struct sub_seq *sseq = nseq->sseqs; struct tipc_subscr *sb = &sub->evt.s;
struct service_range *sr;
struct tipc_name_seq ns; struct tipc_name_seq ns;
struct tipc_subscr *s = &sub->evt.s; struct publication *p;
bool no_status; struct rb_node *n;
bool first;
ns.type = tipc_sub_read(s, seq.type); ns.type = tipc_sub_read(sb, seq.type);
ns.lower = tipc_sub_read(s, seq.lower); ns.lower = tipc_sub_read(sb, seq.lower);
ns.upper = tipc_sub_read(s, seq.upper); ns.upper = tipc_sub_read(sb, seq.upper);
no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS;
tipc_sub_get(sub); tipc_sub_get(sub);
list_add(&sub->nameseq_list, &nseq->subscriptions); list_add(&sub->service_list, &service->subscriptions);
if (no_status || !sseq) if (tipc_sub_read(sb, filter) & TIPC_SUB_NO_STATUS)
return; return;
while (sseq != &nseq->sseqs[nseq->first_free]) { for (n = rb_first(&service->ranges); n; n = rb_next(n)) {
if (tipc_sub_check_overlap(&ns, sseq->lower, sseq->upper)) { sr = container_of(n, struct service_range, tree_node);
struct publication *crs; if (sr->lower > ns.upper)
struct name_info *info = sseq->info; break;
int must_report = 1; if (!tipc_sub_check_overlap(&ns, sr->lower, sr->upper))
continue;
list_for_each_entry(crs, &info->all_publ, all_publ) { first = true;
tipc_sub_report_overlap(sub, sseq->lower,
sseq->upper, list_for_each_entry(p, &sr->all_publ, all_publ) {
TIPC_PUBLISHED, tipc_sub_report_overlap(sub, sr->lower, sr->upper,
crs->port, TIPC_PUBLISHED, p->port,
crs->node, p->node, p->scope, first);
crs->scope, first = false;
must_report);
must_report = 0;
}
} }
sseq++;
} }
} }
static struct name_seq *nametbl_find_seq(struct net *net, u32 type) static struct tipc_service *tipc_service_find(struct net *net, u32 type)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct name_table *nt = tipc_name_table(net);
struct hlist_head *seq_head; struct hlist_head *service_head;
struct name_seq *ns; struct tipc_service *service;
seq_head = &tn->nametbl->seq_hlist[hash(type)]; service_head = &nt->services[hash(type)];
hlist_for_each_entry_rcu(ns, seq_head, ns_list) { hlist_for_each_entry_rcu(service, service_head, service_list) {
if (ns->type == type) if (service->type == type)
return ns; return service;
} }
return NULL; return NULL;
}; };
struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type, struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
u32 lower, u32 upper, u32 scope, u32 lower, u32 upper,
u32 node, u32 port, u32 key) u32 scope, u32 node,
u32 port, u32 key)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct name_table *nt = tipc_name_table(net);
struct publication *publ; struct tipc_service *sc;
struct name_seq *seq = nametbl_find_seq(net, type); struct publication *p;
int index = hash(type);
if (scope > TIPC_NODE_SCOPE || lower > upper) { if (scope > TIPC_NODE_SCOPE || lower > upper) {
pr_debug("Failed to publish illegal {%u,%u,%u} with scope %u\n", pr_debug("Failed to bind illegal {%u,%u,%u} with scope %u\n",
type, lower, upper, scope); type, lower, upper, scope);
return NULL; return NULL;
} }
sc = tipc_service_find(net, type);
if (!seq) if (!sc)
seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); sc = tipc_service_create(type, &nt->services[hash(type)]);
if (!seq) if (!sc)
return NULL; return NULL;
spin_lock_bh(&seq->lock); spin_lock_bh(&sc->lock);
publ = tipc_nameseq_insert_publ(net, seq, type, lower, upper, p = tipc_service_insert_publ(net, sc, type, lower, upper,
scope, node, port, key); scope, node, port, key);
spin_unlock_bh(&seq->lock); spin_unlock_bh(&sc->lock);
return publ; return p;
} }
struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
u32 lower, u32 node, u32 port, u32 lower, u32 node, u32 port,
u32 key) u32 key)
{ {
struct publication *publ; struct tipc_service *sc = tipc_service_find(net, type);
struct name_seq *seq = nametbl_find_seq(net, type); struct publication *p = NULL;
if (!seq) if (!sc)
return NULL; return NULL;
spin_lock_bh(&seq->lock); spin_lock_bh(&sc->lock);
publ = tipc_nameseq_remove_publ(net, seq, lower, node, port, key); p = tipc_service_remove_publ(net, sc, lower, node, port, key);
if (!seq->first_free && list_empty(&seq->subscriptions)) {
hlist_del_init_rcu(&seq->ns_list); /* Delete service item if this no more publications and subscriptions */
kfree(seq->sseqs); if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
spin_unlock_bh(&seq->lock); hlist_del_init_rcu(&sc->service_list);
kfree_rcu(seq, rcu); kfree_rcu(sc, rcu);
return publ;
} }
spin_unlock_bh(&seq->lock); spin_unlock_bh(&sc->lock);
return publ; return p;
} }
/** /**
* tipc_nametbl_translate - perform name translation * tipc_nametbl_translate - perform service instance to socket translation
* *
* On entry, 'destnode' is the search domain used during translation. * On entry, 'destnode' is the search domain used during translation.
* *
...@@ -492,7 +405,7 @@ struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, ...@@ -492,7 +405,7 @@ struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
* - if name translation is deferred to another node/cluster/zone, * - if name translation is deferred to another node/cluster/zone,
* leaves 'destnode' unchanged (will be non-zero) and returns 0 * leaves 'destnode' unchanged (will be non-zero) and returns 0
* - if name translation is attempted and succeeds, sets 'destnode' * - if name translation is attempted and succeeds, sets 'destnode'
* to publishing node and returns port reference (will be non-zero) * to publication node and returns port reference (will be non-zero)
* - if name translation is attempted and fails, sets 'destnode' to 0 * - if name translation is attempted and fails, sets 'destnode' to 0
* and returns 0 * and returns 0
*/ */
...@@ -502,10 +415,9 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, ...@@ -502,10 +415,9 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
struct tipc_net *tn = tipc_net(net); struct tipc_net *tn = tipc_net(net);
bool legacy = tn->legacy_addr_format; bool legacy = tn->legacy_addr_format;
u32 self = tipc_own_addr(net); u32 self = tipc_own_addr(net);
struct sub_seq *sseq; struct service_range *sr;
struct name_info *info; struct tipc_service *sc;
struct publication *publ; struct publication *p;
struct name_seq *seq;
u32 port = 0; u32 port = 0;
u32 node = 0; u32 node = 0;
...@@ -513,49 +425,49 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, ...@@ -513,49 +425,49 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
return 0; return 0;
rcu_read_lock(); rcu_read_lock();
seq = nametbl_find_seq(net, type); sc = tipc_service_find(net, type);
if (unlikely(!seq)) if (unlikely(!sc))
goto not_found; goto not_found;
spin_lock_bh(&seq->lock);
sseq = nameseq_find_subseq(seq, instance); spin_lock_bh(&sc->lock);
if (unlikely(!sseq)) sr = tipc_service_find_range(sc, instance);
if (unlikely(!sr))
goto no_match; goto no_match;
info = sseq->info;
/* Closest-First Algorithm */ /* Closest-First Algorithm */
if (legacy && !*destnode) { if (legacy && !*destnode) {
if (!list_empty(&info->local_publ)) { if (!list_empty(&sr->local_publ)) {
publ = list_first_entry(&info->local_publ, p = list_first_entry(&sr->local_publ,
struct publication, struct publication,
local_publ); local_publ);
list_move_tail(&publ->local_publ, list_move_tail(&p->local_publ,
&info->local_publ); &sr->local_publ);
} else { } else {
publ = list_first_entry(&info->all_publ, p = list_first_entry(&sr->all_publ,
struct publication, struct publication,
all_publ); all_publ);
list_move_tail(&publ->all_publ, list_move_tail(&p->all_publ,
&info->all_publ); &sr->all_publ);
} }
} }
/* Round-Robin Algorithm */ /* Round-Robin Algorithm */
else if (*destnode == tipc_own_addr(net)) { else if (*destnode == self) {
if (list_empty(&info->local_publ)) if (list_empty(&sr->local_publ))
goto no_match; goto no_match;
publ = list_first_entry(&info->local_publ, struct publication, p = list_first_entry(&sr->local_publ, struct publication,
local_publ); local_publ);
list_move_tail(&publ->local_publ, &info->local_publ); list_move_tail(&p->local_publ, &sr->local_publ);
} else { } else {
publ = list_first_entry(&info->all_publ, struct publication, p = list_first_entry(&sr->all_publ, struct publication,
all_publ); all_publ);
list_move_tail(&publ->all_publ, &info->all_publ); list_move_tail(&p->all_publ, &sr->all_publ);
} }
port = publ->port; port = p->port;
node = publ->node; node = p->node;
no_match: no_match:
spin_unlock_bh(&seq->lock); spin_unlock_bh(&sc->lock);
not_found: not_found:
rcu_read_unlock(); rcu_read_unlock();
*destnode = node; *destnode = node;
...@@ -567,34 +479,36 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope, ...@@ -567,34 +479,36 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
bool all) bool all)
{ {
u32 self = tipc_own_addr(net); u32 self = tipc_own_addr(net);
struct publication *publ; struct service_range *sr;
struct name_info *info; struct tipc_service *sc;
struct name_seq *seq; struct publication *p;
struct sub_seq *sseq;
*dstcnt = 0; *dstcnt = 0;
rcu_read_lock(); rcu_read_lock();
seq = nametbl_find_seq(net, type); sc = tipc_service_find(net, type);
if (unlikely(!seq)) if (unlikely(!sc))
goto exit; goto exit;
spin_lock_bh(&seq->lock);
sseq = nameseq_find_subseq(seq, instance); spin_lock_bh(&sc->lock);
if (likely(sseq)) {
info = sseq->info; sr = tipc_service_find_range(sc, instance);
list_for_each_entry(publ, &info->all_publ, all_publ) { if (!sr)
if (publ->scope != scope) goto no_match;
continue;
if (publ->port == exclude && publ->node == self) list_for_each_entry(p, &sr->all_publ, all_publ) {
continue; if (p->scope != scope)
tipc_dest_push(dsts, publ->node, publ->port); continue;
(*dstcnt)++; if (p->port == exclude && p->node == self)
if (all) continue;
continue; tipc_dest_push(dsts, p->node, p->port);
list_move_tail(&publ->all_publ, &info->all_publ); (*dstcnt)++;
break; if (all)
} continue;
list_move_tail(&p->all_publ, &sr->all_publ);
break;
} }
spin_unlock_bh(&seq->lock); no_match:
spin_unlock_bh(&sc->lock);
exit: exit:
rcu_read_unlock(); rcu_read_unlock();
return !list_empty(dsts); return !list_empty(dsts);
...@@ -603,61 +517,64 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope, ...@@ -603,61 +517,64 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper, void tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
u32 scope, bool exact, struct list_head *dports) u32 scope, bool exact, struct list_head *dports)
{ {
struct sub_seq *sseq_stop; struct service_range *sr;
struct name_info *info; struct tipc_service *sc;
struct publication *p; struct publication *p;
struct name_seq *seq; struct rb_node *n;
struct sub_seq *sseq;
rcu_read_lock(); rcu_read_lock();
seq = nametbl_find_seq(net, type); sc = tipc_service_find(net, type);
if (!seq) if (!sc)
goto exit; goto exit;
spin_lock_bh(&seq->lock); spin_lock_bh(&sc->lock);
sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
sseq_stop = seq->sseqs + seq->first_free; for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
for (; sseq != sseq_stop; sseq++) { sr = container_of(n, struct service_range, tree_node);
if (sseq->lower > upper) if (sr->upper < lower)
continue;
if (sr->lower > upper)
break; break;
info = sseq->info; list_for_each_entry(p, &sr->local_publ, local_publ) {
list_for_each_entry(p, &info->local_publ, local_publ) {
if (p->scope == scope || (!exact && p->scope < scope)) if (p->scope == scope || (!exact && p->scope < scope))
tipc_dest_push(dports, 0, p->port); tipc_dest_push(dports, 0, p->port);
} }
} }
spin_unlock_bh(&seq->lock); spin_unlock_bh(&sc->lock);
exit: exit:
rcu_read_unlock(); rcu_read_unlock();
} }
/* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes /* tipc_nametbl_lookup_dst_nodes - find broadcast destination nodes
* - Creates list of nodes that overlap the given multicast address * - Creates list of nodes that overlap the given multicast address
* - Determines if any node local ports overlap * - Determines if any node local destinations overlap
*/ */
void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
u32 upper, struct tipc_nlist *nodes) u32 upper, struct tipc_nlist *nodes)
{ {
struct sub_seq *sseq, *stop; struct service_range *sr;
struct publication *publ; struct tipc_service *sc;
struct name_info *info; struct publication *p;
struct name_seq *seq; struct rb_node *n;
rcu_read_lock(); rcu_read_lock();
seq = nametbl_find_seq(net, type); sc = tipc_service_find(net, type);
if (!seq) if (!sc)
goto exit; goto exit;
spin_lock_bh(&seq->lock); spin_lock_bh(&sc->lock);
sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
stop = seq->sseqs + seq->first_free; for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
for (; sseq != stop && sseq->lower <= upper; sseq++) { sr = container_of(n, struct service_range, tree_node);
info = sseq->info; if (sr->upper < lower)
list_for_each_entry(publ, &info->all_publ, all_publ) { continue;
tipc_nlist_add(nodes, publ->node); if (sr->lower > upper)
break;
list_for_each_entry(p, &sr->all_publ, all_publ) {
tipc_nlist_add(nodes, p->node);
} }
} }
spin_unlock_bh(&seq->lock); spin_unlock_bh(&sc->lock);
exit: exit:
rcu_read_unlock(); rcu_read_unlock();
} }
...@@ -667,89 +584,88 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, ...@@ -667,89 +584,88 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
u32 type, u32 scope) u32 type, u32 scope)
{ {
struct sub_seq *sseq, *stop; struct service_range *sr;
struct name_info *info; struct tipc_service *sc;
struct publication *p; struct publication *p;
struct name_seq *seq; struct rb_node *n;
rcu_read_lock(); rcu_read_lock();
seq = nametbl_find_seq(net, type); sc = tipc_service_find(net, type);
if (!seq) if (!sc)
goto exit; goto exit;
spin_lock_bh(&seq->lock); spin_lock_bh(&sc->lock);
sseq = seq->sseqs; for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
stop = seq->sseqs + seq->first_free; sr = container_of(n, struct service_range, tree_node);
for (; sseq != stop; sseq++) { list_for_each_entry(p, &sr->all_publ, all_publ) {
info = sseq->info;
list_for_each_entry(p, &info->all_publ, all_publ) {
if (p->scope != scope) if (p->scope != scope)
continue; continue;
tipc_group_add_member(grp, p->node, p->port, p->lower); tipc_group_add_member(grp, p->node, p->port, p->lower);
} }
} }
spin_unlock_bh(&seq->lock); spin_unlock_bh(&sc->lock);
exit: exit:
rcu_read_unlock(); rcu_read_unlock();
} }
/* /* tipc_nametbl_publish - add service binding to name table
* tipc_nametbl_publish - add name publication to network name tables
*/ */
struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
u32 upper, u32 scope, u32 port_ref, u32 upper, u32 scope, u32 port,
u32 key) u32 key)
{ {
struct publication *publ; struct name_table *nt = tipc_name_table(net);
struct sk_buff *buf = NULL; struct tipc_net *tn = tipc_net(net);
struct tipc_net *tn = net_generic(net, tipc_net_id); struct publication *p = NULL;
struct sk_buff *skb = NULL;
spin_lock_bh(&tn->nametbl_lock); spin_lock_bh(&tn->nametbl_lock);
if (tn->nametbl->local_publ_count >= TIPC_MAX_PUBLICATIONS) {
pr_warn("Publication failed, local publication limit reached (%u)\n", if (nt->local_publ_count >= TIPC_MAX_PUBL) {
TIPC_MAX_PUBLICATIONS); pr_warn("Bind failed, max limit %u reached\n", TIPC_MAX_PUBL);
spin_unlock_bh(&tn->nametbl_lock); goto exit;
return NULL;
} }
publ = tipc_nametbl_insert_publ(net, type, lower, upper, scope, p = tipc_nametbl_insert_publ(net, type, lower, upper, scope,
tipc_own_addr(net), port_ref, key); tipc_own_addr(net), port, key);
if (likely(publ)) { if (p) {
tn->nametbl->local_publ_count++; nt->local_publ_count++;
buf = tipc_named_publish(net, publ); skb = tipc_named_publish(net, p);
/* Any pending external events? */ /* Any pending external events? */
tipc_named_process_backlog(net); tipc_named_process_backlog(net);
} }
exit:
spin_unlock_bh(&tn->nametbl_lock); spin_unlock_bh(&tn->nametbl_lock);
if (buf) if (skb)
tipc_node_broadcast(net, buf); tipc_node_broadcast(net, skb);
return publ; return p;
} }
/** /**
* tipc_nametbl_withdraw - withdraw name publication from network name tables * tipc_nametbl_withdraw - withdraw a service binding
*/ */
int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port, int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
u32 key) u32 port, u32 key)
{ {
struct publication *publ; struct name_table *nt = tipc_name_table(net);
struct tipc_net *tn = tipc_net(net);
u32 self = tipc_own_addr(net);
struct sk_buff *skb = NULL; struct sk_buff *skb = NULL;
struct tipc_net *tn = net_generic(net, tipc_net_id); struct publication *p;
spin_lock_bh(&tn->nametbl_lock); spin_lock_bh(&tn->nametbl_lock);
publ = tipc_nametbl_remove_publ(net, type, lower, tipc_own_addr(net),
port, key); p = tipc_nametbl_remove_publ(net, type, lower, self, port, key);
if (likely(publ)) { if (p) {
tn->nametbl->local_publ_count--; nt->local_publ_count--;
skb = tipc_named_withdraw(net, publ); skb = tipc_named_withdraw(net, p);
/* Any pending external events? */ /* Any pending external events? */
tipc_named_process_backlog(net); tipc_named_process_backlog(net);
list_del_init(&publ->binding_sock); list_del_init(&p->binding_sock);
kfree_rcu(publ, rcu); kfree_rcu(p, rcu);
} else { } else {
pr_err("Unable to remove local publication\n" pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
"(type=%u, lower=%u, port=%u, key=%u)\n",
type, lower, port, key); type, lower, port, key);
} }
spin_unlock_bh(&tn->nametbl_lock); spin_unlock_bh(&tn->nametbl_lock);
...@@ -766,27 +682,24 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port, ...@@ -766,27 +682,24 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 port,
*/ */
void tipc_nametbl_subscribe(struct tipc_subscription *sub) void tipc_nametbl_subscribe(struct tipc_subscription *sub)
{ {
struct name_table *nt = tipc_name_table(sub->net);
struct tipc_net *tn = tipc_net(sub->net); struct tipc_net *tn = tipc_net(sub->net);
struct tipc_subscr *s = &sub->evt.s; struct tipc_subscr *s = &sub->evt.s;
u32 type = tipc_sub_read(s, seq.type); u32 type = tipc_sub_read(s, seq.type);
int index = hash(type); struct tipc_service *sc;
struct name_seq *seq;
struct tipc_name_seq ns;
spin_lock_bh(&tn->nametbl_lock); spin_lock_bh(&tn->nametbl_lock);
seq = nametbl_find_seq(sub->net, type); sc = tipc_service_find(sub->net, type);
if (!seq) if (!sc)
seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); sc = tipc_service_create(type, &nt->services[hash(type)]);
if (seq) { if (sc) {
spin_lock_bh(&seq->lock); spin_lock_bh(&sc->lock);
tipc_nameseq_subscribe(seq, sub); tipc_service_subscribe(sc, sub);
spin_unlock_bh(&seq->lock); spin_unlock_bh(&sc->lock);
} else { } else {
ns.type = tipc_sub_read(s, seq.type); pr_warn("Failed to subscribe for {%u,%u,%u}\n", type,
ns.lower = tipc_sub_read(s, seq.lower); tipc_sub_read(s, seq.lower),
ns.upper = tipc_sub_read(s, seq.upper); tipc_sub_read(s, seq.upper));
pr_warn("Failed to create subscription for {%u,%u,%u}\n",
ns.type, ns.lower, ns.upper);
} }
spin_unlock_bh(&tn->nametbl_lock); spin_unlock_bh(&tn->nametbl_lock);
} }
...@@ -796,124 +709,122 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub) ...@@ -796,124 +709,122 @@ void tipc_nametbl_subscribe(struct tipc_subscription *sub)
*/ */
void tipc_nametbl_unsubscribe(struct tipc_subscription *sub) void tipc_nametbl_unsubscribe(struct tipc_subscription *sub)
{ {
struct tipc_subscr *s = &sub->evt.s;
struct tipc_net *tn = tipc_net(sub->net); struct tipc_net *tn = tipc_net(sub->net);
struct name_seq *seq; struct tipc_subscr *s = &sub->evt.s;
u32 type = tipc_sub_read(s, seq.type); u32 type = tipc_sub_read(s, seq.type);
struct tipc_service *sc;
spin_lock_bh(&tn->nametbl_lock); spin_lock_bh(&tn->nametbl_lock);
seq = nametbl_find_seq(sub->net, type); sc = tipc_service_find(sub->net, type);
if (seq != NULL) { if (!sc)
spin_lock_bh(&seq->lock); goto exit;
list_del_init(&sub->nameseq_list);
tipc_sub_put(sub); spin_lock_bh(&sc->lock);
if (!seq->first_free && list_empty(&seq->subscriptions)) { list_del_init(&sub->service_list);
hlist_del_init_rcu(&seq->ns_list); tipc_sub_put(sub);
kfree(seq->sseqs);
spin_unlock_bh(&seq->lock); /* Delete service item if no more publications and subscriptions */
kfree_rcu(seq, rcu); if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
} else { hlist_del_init_rcu(&sc->service_list);
spin_unlock_bh(&seq->lock); kfree_rcu(sc, rcu);
}
} }
spin_unlock_bh(&sc->lock);
exit:
spin_unlock_bh(&tn->nametbl_lock); spin_unlock_bh(&tn->nametbl_lock);
} }
int tipc_nametbl_init(struct net *net) int tipc_nametbl_init(struct net *net)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = tipc_net(net);
struct name_table *tipc_nametbl; struct name_table *nt;
int i; int i;
tipc_nametbl = kzalloc(sizeof(*tipc_nametbl), GFP_ATOMIC); nt = kzalloc(sizeof(*nt), GFP_ATOMIC);
if (!tipc_nametbl) if (!nt)
return -ENOMEM; return -ENOMEM;
for (i = 0; i < TIPC_NAMETBL_SIZE; i++) for (i = 0; i < TIPC_NAMETBL_SIZE; i++)
INIT_HLIST_HEAD(&tipc_nametbl->seq_hlist[i]); INIT_HLIST_HEAD(&nt->services[i]);
INIT_LIST_HEAD(&tipc_nametbl->node_scope); INIT_LIST_HEAD(&nt->node_scope);
INIT_LIST_HEAD(&tipc_nametbl->cluster_scope); INIT_LIST_HEAD(&nt->cluster_scope);
tn->nametbl = tipc_nametbl; tn->nametbl = nt;
spin_lock_init(&tn->nametbl_lock); spin_lock_init(&tn->nametbl_lock);
return 0; return 0;
} }
/** /**
* tipc_purge_publications - remove all publications for a given type * tipc_service_delete - purge all publications for a service and delete it
*
* tipc_nametbl_lock must be held when calling this function
*/ */
static void tipc_purge_publications(struct net *net, struct name_seq *seq) static void tipc_service_delete(struct net *net, struct tipc_service *sc)
{ {
struct publication *publ, *safe; struct service_range *sr, *tmpr;
struct sub_seq *sseq; struct publication *p, *tmpb;
struct name_info *info;
spin_lock_bh(&sc->lock);
spin_lock_bh(&seq->lock); rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) {
sseq = seq->sseqs; list_for_each_entry_safe(p, tmpb,
info = sseq->info; &sr->all_publ, all_publ) {
list_for_each_entry_safe(publ, safe, &info->all_publ, all_publ) { tipc_service_remove_publ(net, sc, p->lower, p->node,
tipc_nameseq_remove_publ(net, seq, publ->lower, publ->node, p->port, p->key);
publ->port, publ->key); kfree_rcu(p, rcu);
kfree_rcu(publ, rcu); }
} }
hlist_del_init_rcu(&seq->ns_list); hlist_del_init_rcu(&sc->service_list);
kfree(seq->sseqs); spin_unlock_bh(&sc->lock);
spin_unlock_bh(&seq->lock); kfree_rcu(sc, rcu);
kfree_rcu(seq, rcu);
} }
void tipc_nametbl_stop(struct net *net) void tipc_nametbl_stop(struct net *net)
{ {
struct name_table *nt = tipc_name_table(net);
struct tipc_net *tn = tipc_net(net);
struct hlist_head *service_head;
struct tipc_service *service;
u32 i; u32 i;
struct name_seq *seq;
struct hlist_head *seq_head;
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct name_table *tipc_nametbl = tn->nametbl;
/* Verify name table is empty and purge any lingering /* Verify name table is empty and purge any lingering
* publications, then release the name table * publications, then release the name table
*/ */
spin_lock_bh(&tn->nametbl_lock); spin_lock_bh(&tn->nametbl_lock);
for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
if (hlist_empty(&tipc_nametbl->seq_hlist[i])) if (hlist_empty(&nt->services[i]))
continue; continue;
seq_head = &tipc_nametbl->seq_hlist[i]; service_head = &nt->services[i];
hlist_for_each_entry_rcu(seq, seq_head, ns_list) { hlist_for_each_entry_rcu(service, service_head, service_list) {
tipc_purge_publications(net, seq); tipc_service_delete(net, service);
} }
} }
spin_unlock_bh(&tn->nametbl_lock); spin_unlock_bh(&tn->nametbl_lock);
synchronize_net(); synchronize_net();
kfree(tipc_nametbl); kfree(nt);
} }
static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
struct name_seq *seq, struct tipc_service *service,
struct sub_seq *sseq, u32 *last_publ) struct service_range *sr,
u32 *last_key)
{ {
void *hdr;
struct nlattr *attrs;
struct nlattr *publ;
struct publication *p; struct publication *p;
struct nlattr *attrs;
struct nlattr *b;
void *hdr;
if (*last_publ) { if (*last_key) {
list_for_each_entry(p, &sseq->info->all_publ, all_publ) list_for_each_entry(p, &sr->all_publ, all_publ)
if (p->key == *last_publ) if (p->key == *last_key)
break; break;
if (p->key != *last_publ) if (p->key != *last_key)
return -EPIPE; return -EPIPE;
} else { } else {
p = list_first_entry(&sseq->info->all_publ, struct publication, p = list_first_entry(&sr->all_publ,
struct publication,
all_publ); all_publ);
} }
list_for_each_entry_from(p, &sseq->info->all_publ, all_publ) { list_for_each_entry_from(p, &sr->all_publ, all_publ) {
*last_publ = p->key; *last_key = p->key;
hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, hdr = genlmsg_put(msg->skb, msg->portid, msg->seq,
&tipc_genl_family, NLM_F_MULTI, &tipc_genl_family, NLM_F_MULTI,
...@@ -925,15 +836,15 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, ...@@ -925,15 +836,15 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
if (!attrs) if (!attrs)
goto msg_full; goto msg_full;
publ = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL); b = nla_nest_start(msg->skb, TIPC_NLA_NAME_TABLE_PUBL);
if (!publ) if (!b)
goto attr_msg_full; goto attr_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, seq->type)) if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, service->type))
goto publ_msg_full; goto publ_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sseq->lower)) if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sr->lower))
goto publ_msg_full; goto publ_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sseq->upper)) if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sr->upper))
goto publ_msg_full; goto publ_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope)) if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope))
goto publ_msg_full; goto publ_msg_full;
...@@ -944,16 +855,16 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, ...@@ -944,16 +855,16 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key)) if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key))
goto publ_msg_full; goto publ_msg_full;
nla_nest_end(msg->skb, publ); nla_nest_end(msg->skb, b);
nla_nest_end(msg->skb, attrs); nla_nest_end(msg->skb, attrs);
genlmsg_end(msg->skb, hdr); genlmsg_end(msg->skb, hdr);
} }
*last_publ = 0; *last_key = 0;
return 0; return 0;
publ_msg_full: publ_msg_full:
nla_nest_cancel(msg->skb, publ); nla_nest_cancel(msg->skb, b);
attr_msg_full: attr_msg_full:
nla_nest_cancel(msg->skb, attrs); nla_nest_cancel(msg->skb, attrs);
msg_full: msg_full:
...@@ -962,39 +873,34 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg, ...@@ -962,39 +873,34 @@ static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
return -EMSGSIZE; return -EMSGSIZE;
} }
static int __tipc_nl_subseq_list(struct tipc_nl_msg *msg, struct name_seq *seq, static int __tipc_nl_service_range_list(struct tipc_nl_msg *msg,
u32 *last_lower, u32 *last_publ) struct tipc_service *sc,
u32 *last_lower, u32 *last_key)
{ {
struct sub_seq *sseq; struct service_range *sr;
struct sub_seq *sseq_start; struct rb_node *n;
int err; int err;
if (*last_lower) { for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
sseq_start = nameseq_find_subseq(seq, *last_lower); sr = container_of(n, struct service_range, tree_node);
if (!sseq_start) if (sr->lower < *last_lower)
return -EPIPE; continue;
} else { err = __tipc_nl_add_nametable_publ(msg, sc, sr, last_key);
sseq_start = seq->sseqs;
}
for (sseq = sseq_start; sseq != &seq->sseqs[seq->first_free]; sseq++) {
err = __tipc_nl_add_nametable_publ(msg, seq, sseq, last_publ);
if (err) { if (err) {
*last_lower = sseq->lower; *last_lower = sr->lower;
return err; return err;
} }
} }
*last_lower = 0; *last_lower = 0;
return 0; return 0;
} }
static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, static int tipc_nl_service_list(struct net *net, struct tipc_nl_msg *msg,
u32 *last_type, u32 *last_lower, u32 *last_publ) u32 *last_type, u32 *last_lower, u32 *last_key)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = tipc_net(net);
struct hlist_head *seq_head; struct tipc_service *service = NULL;
struct name_seq *seq = NULL; struct hlist_head *head;
int err; int err;
int i; int i;
...@@ -1004,30 +910,31 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, ...@@ -1004,30 +910,31 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg,
i = 0; i = 0;
for (; i < TIPC_NAMETBL_SIZE; i++) { for (; i < TIPC_NAMETBL_SIZE; i++) {
seq_head = &tn->nametbl->seq_hlist[i]; head = &tn->nametbl->services[i];
if (*last_type) { if (*last_type) {
seq = nametbl_find_seq(net, *last_type); service = tipc_service_find(net, *last_type);
if (!seq) if (!service)
return -EPIPE; return -EPIPE;
} else { } else {
hlist_for_each_entry_rcu(seq, seq_head, ns_list) hlist_for_each_entry_rcu(service, head, service_list)
break; break;
if (!seq) if (!service)
continue; continue;
} }
hlist_for_each_entry_from_rcu(seq, ns_list) { hlist_for_each_entry_from_rcu(service, service_list) {
spin_lock_bh(&seq->lock); spin_lock_bh(&service->lock);
err = __tipc_nl_subseq_list(msg, seq, last_lower, err = __tipc_nl_service_range_list(msg, service,
last_publ); last_lower,
last_key);
if (err) { if (err) {
*last_type = seq->type; *last_type = service->type;
spin_unlock_bh(&seq->lock); spin_unlock_bh(&service->lock);
return err; return err;
} }
spin_unlock_bh(&seq->lock); spin_unlock_bh(&service->lock);
} }
*last_type = 0; *last_type = 0;
} }
...@@ -1036,13 +943,13 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg, ...@@ -1036,13 +943,13 @@ static int tipc_nl_seq_list(struct net *net, struct tipc_nl_msg *msg,
int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
{ {
int err; struct net *net = sock_net(skb->sk);
int done = cb->args[3];
u32 last_type = cb->args[0]; u32 last_type = cb->args[0];
u32 last_lower = cb->args[1]; u32 last_lower = cb->args[1];
u32 last_publ = cb->args[2]; u32 last_key = cb->args[2];
struct net *net = sock_net(skb->sk); int done = cb->args[3];
struct tipc_nl_msg msg; struct tipc_nl_msg msg;
int err;
if (done) if (done)
return 0; return 0;
...@@ -1052,7 +959,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) ...@@ -1052,7 +959,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
msg.seq = cb->nlh->nlmsg_seq; msg.seq = cb->nlh->nlmsg_seq;
rcu_read_lock(); rcu_read_lock();
err = tipc_nl_seq_list(net, &msg, &last_type, &last_lower, &last_publ); err = tipc_nl_service_list(net, &msg, &last_type,
&last_lower, &last_key);
if (!err) { if (!err) {
done = 1; done = 1;
} else if (err != -EMSGSIZE) { } else if (err != -EMSGSIZE) {
...@@ -1068,7 +976,7 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb) ...@@ -1068,7 +976,7 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
cb->args[0] = last_type; cb->args[0] = last_type;
cb->args[1] = last_lower; cb->args[1] = last_lower;
cb->args[2] = last_publ; cb->args[2] = last_key;
cb->args[3] = done; cb->args[3] = done;
return skb->len; return skb->len;
......
...@@ -97,7 +97,7 @@ struct publication { ...@@ -97,7 +97,7 @@ struct publication {
* @local_publ_count: number of publications issued by this node * @local_publ_count: number of publications issued by this node
*/ */
struct name_table { struct name_table {
struct hlist_head seq_hlist[TIPC_NAMETBL_SIZE]; struct hlist_head services[TIPC_NAMETBL_SIZE];
struct list_head node_scope; struct list_head node_scope;
struct list_head cluster_scope; struct list_head cluster_scope;
u32 local_publ_count; u32 local_publ_count;
......
...@@ -324,12 +324,12 @@ static void tipc_node_write_unlock(struct tipc_node *n) ...@@ -324,12 +324,12 @@ static void tipc_node_write_unlock(struct tipc_node *n)
if (flags & TIPC_NOTIFY_LINK_UP) { if (flags & TIPC_NOTIFY_LINK_UP) {
tipc_mon_peer_up(net, addr, bearer_id); tipc_mon_peer_up(net, addr, bearer_id);
tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr, tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
TIPC_NODE_SCOPE, link_id, addr); TIPC_NODE_SCOPE, link_id, link_id);
} }
if (flags & TIPC_NOTIFY_LINK_DOWN) { if (flags & TIPC_NOTIFY_LINK_DOWN) {
tipc_mon_peer_down(net, addr, bearer_id); tipc_mon_peer_down(net, addr, bearer_id);
tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
link_id, addr); link_id, link_id);
} }
} }
......
...@@ -40,7 +40,7 @@ ...@@ -40,7 +40,7 @@
#include "topsrv.h" #include "topsrv.h"
#define TIPC_MAX_SUBSCR 65535 #define TIPC_MAX_SUBSCR 65535
#define TIPC_MAX_PUBLICATIONS 65535 #define TIPC_MAX_PUBL 65535
struct tipc_subscription; struct tipc_subscription;
struct tipc_conn; struct tipc_conn;
...@@ -58,7 +58,7 @@ struct tipc_subscription { ...@@ -58,7 +58,7 @@ struct tipc_subscription {
struct kref kref; struct kref kref;
struct net *net; struct net *net;
struct timer_list timer; struct timer_list timer;
struct list_head nameseq_list; struct list_head service_list;
struct list_head sub_list; struct list_head sub_list;
struct tipc_event evt; struct tipc_event evt;
int conid; int conid;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment