Commit fdb533c3 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-improvements-to-group-messaging'

Jon Maloy says:

====================
tipc: improvements to group messaging

We make a number of simplifications and improvements to the group
messaging service. They aim at readability/maintainability of the code
as well as scalability.

The series is based on commit f9c935db ("tipc: fix problems with
multipoint-to-point flow control) which has been applied to 'net' but
not yet to 'net-next'.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents a67c01e2 eb929a91
......@@ -117,10 +117,9 @@ static inline unsigned int tipc_node(__u32 addr)
/*
* Publication scopes when binding port names and port name sequences
*/
#define TIPC_ZONE_SCOPE 1
#define TIPC_CLUSTER_SCOPE 2
#define TIPC_NODE_SCOPE 3
#define TIPC_ZONE_SCOPE 1
#define TIPC_CLUSTER_SCOPE 2
#define TIPC_NODE_SCOPE 3
/*
* Limiting values for messages
......
......@@ -49,8 +49,6 @@
#define ADV_ACTIVE (ADV_UNIT * 12)
enum mbr_state {
MBR_QUARANTINED,
MBR_DISCOVERED,
MBR_JOINING,
MBR_PUBLISHED,
MBR_JOINED,
......@@ -65,7 +63,6 @@ struct tipc_member {
struct rb_node tree_node;
struct list_head list;
struct list_head small_win;
struct sk_buff *event_msg;
struct sk_buff_head deferredq;
struct tipc_group *group;
u32 node;
......@@ -77,7 +74,6 @@ struct tipc_member {
u16 bc_rcv_nxt;
u16 bc_syncpt;
u16 bc_acked;
bool usr_pending;
};
struct tipc_group {
......@@ -85,13 +81,11 @@ struct tipc_group {
struct list_head small_win;
struct list_head pending;
struct list_head active;
struct list_head reclaiming;
struct tipc_nlist dests;
struct net *net;
int subid;
u32 type;
u32 instance;
u32 domain;
u32 scope;
u32 portid;
u16 member_cnt;
......@@ -101,11 +95,27 @@ struct tipc_group {
u16 bc_ackers;
bool loopback;
bool events;
bool open;
};
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
int mtyp, struct sk_buff_head *xmitq);
bool tipc_group_is_open(struct tipc_group *grp)
{
return grp->open;
}
static void tipc_group_open(struct tipc_member *m, bool *wakeup)
{
*wakeup = false;
if (list_empty(&m->small_win))
return;
list_del_init(&m->small_win);
m->group->open = true;
*wakeup = true;
}
static void tipc_group_decr_active(struct tipc_group *grp,
struct tipc_member *m)
{
......@@ -139,12 +149,12 @@ u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
static bool tipc_group_is_receiver(struct tipc_member *m)
{
return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
return m && m->state != MBR_JOINING && m->state != MBR_LEAVING;
}
static bool tipc_group_is_sender(struct tipc_member *m)
{
return m && m->state >= MBR_JOINED;
return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED;
}
u32 tipc_group_exclude(struct tipc_group *grp)
......@@ -162,6 +172,8 @@ int tipc_group_size(struct tipc_group *grp)
struct tipc_group *tipc_group_create(struct net *net, u32 portid,
struct tipc_group_req *mreq)
{
u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS;
bool global = mreq->scope != TIPC_NODE_SCOPE;
struct tipc_group *grp;
u32 type = mreq->type;
......@@ -172,22 +184,37 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
INIT_LIST_HEAD(&grp->small_win);
INIT_LIST_HEAD(&grp->active);
INIT_LIST_HEAD(&grp->pending);
INIT_LIST_HEAD(&grp->reclaiming);
grp->members = RB_ROOT;
grp->net = net;
grp->portid = portid;
grp->domain = addr_domain(net, mreq->scope);
grp->type = type;
grp->instance = mreq->instance;
grp->scope = mreq->scope;
grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE;
if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0,
filter, &grp->subid))
return grp;
kfree(grp);
return NULL;
}
void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
{
struct rb_root *tree = &grp->members;
struct tipc_member *m, *tmp;
struct sk_buff_head xmitq;
skb_queue_head_init(&xmitq);
rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
tipc_group_update_member(m, 0);
}
tipc_node_distr_xmit(net, &xmitq);
*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
}
void tipc_group_delete(struct net *net, struct tipc_group *grp)
{
struct rb_root *tree = &grp->members;
......@@ -278,7 +305,7 @@ static void tipc_group_add_to_tree(struct tipc_group *grp,
static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
u32 node, u32 port,
int state)
u32 instance, int state)
{
struct tipc_member *m;
......@@ -291,6 +318,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
m->group = grp;
m->node = node;
m->port = port;
m->instance = instance;
m->bc_acked = grp->bc_snd_nxt - 1;
grp->member_cnt++;
tipc_group_add_to_tree(grp, m);
......@@ -299,9 +327,10 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
return m;
}
void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
void tipc_group_add_member(struct tipc_group *grp, u32 node,
u32 port, u32 instance)
{
tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED);
}
static void tipc_group_delete_member(struct tipc_group *grp,
......@@ -392,20 +421,20 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
int adv, state;
m = tipc_group_find_dest(grp, dnode, dport);
*mbr = m;
if (!m)
if (!tipc_group_is_receiver(m)) {
*mbr = NULL;
return false;
if (m->usr_pending)
return true;
}
*mbr = m;
if (m->window >= len)
return false;
m->usr_pending = true;
grp->open = false;
/* If not fully advertised, do it now to prevent mutual blocking */
adv = m->advertised;
state = m->state;
if (state < MBR_JOINED)
return true;
if (state == MBR_JOINED && adv == ADV_IDLE)
return true;
if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
......@@ -423,9 +452,10 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
struct tipc_member *m = NULL;
/* If prev bcast was replicast, reject until all receivers have acked */
if (grp->bc_ackers)
if (grp->bc_ackers) {
grp->open = false;
return true;
}
if (list_empty(&grp->small_win))
return false;
......@@ -571,24 +601,34 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
switch (m->state) {
case MBR_JOINED:
/* Reclaim advertised space from least active member */
if (!list_empty(active) && active_cnt >= reclaim_limit) {
/* First, decide if member can go active */
if (active_cnt <= max_active) {
m->state = MBR_ACTIVE;
list_add_tail(&m->list, active);
grp->active_cnt++;
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
} else {
m->state = MBR_PENDING;
list_add_tail(&m->list, &grp->pending);
}
if (active_cnt < reclaim_limit)
break;
/* Reclaim from oldest active member, if possible */
if (!list_empty(active)) {
rm = list_first_entry(active, struct tipc_member, list);
rm->state = MBR_RECLAIMING;
list_move_tail(&rm->list, &grp->reclaiming);
list_del_init(&rm->list);
tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
}
/* If max active, become pending and wait for reclaimed space */
if (active_cnt >= max_active) {
m->state = MBR_PENDING;
list_add_tail(&m->list, &grp->pending);
break;
}
/* Otherwise become active */
m->state = MBR_ACTIVE;
list_add_tail(&m->list, &grp->active);
grp->active_cnt++;
/* Fall through */
/* Nobody to reclaim from; - revert oldest pending to JOINED */
pm = list_first_entry(&grp->pending, struct tipc_member, list);
list_del_init(&pm->list);
pm->state = MBR_JOINED;
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
break;
case MBR_ACTIVE:
if (!list_is_last(&m->list, &grp->active))
list_move_tail(&m->list, &grp->active);
......@@ -600,12 +640,12 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
if (m->advertised > ADV_IDLE)
break;
m->state = MBR_JOINED;
grp->active_cnt--;
if (m->advertised < ADV_IDLE) {
pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
}
grp->active_cnt--;
list_del_init(&m->list);
if (list_empty(&grp->pending))
return;
......@@ -617,7 +657,6 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
break;
case MBR_RECLAIMING:
case MBR_DISCOVERED:
case MBR_JOINING:
case MBR_LEAVING:
default:
......@@ -625,6 +664,40 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
}
}
static void tipc_group_create_event(struct tipc_group *grp,
struct tipc_member *m,
u32 event, u16 seqno,
struct sk_buff_head *inputq)
{ u32 dnode = tipc_own_addr(grp->net);
struct tipc_event evt;
struct sk_buff *skb;
struct tipc_msg *hdr;
evt.event = event;
evt.found_lower = m->instance;
evt.found_upper = m->instance;
evt.port.ref = m->port;
evt.port.node = m->node;
evt.s.seq.type = grp->type;
evt.s.seq.lower = m->instance;
evt.s.seq.upper = m->instance;
skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
GROUP_H_SIZE, sizeof(evt), dnode, m->node,
grp->portid, m->port, 0);
if (!skb)
return;
hdr = buf_msg(skb);
msg_set_nametype(hdr, grp->type);
msg_set_grp_evt(hdr, event);
msg_set_dest_droppable(hdr, true);
msg_set_grp_bc_seqno(hdr, seqno);
memcpy(msg_data(hdr), &evt, sizeof(evt));
TIPC_SKB_CB(skb)->orig_member = m->instance;
__skb_queue_tail(inputq, skb);
}
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
int mtyp, struct sk_buff_head *xmitq)
{
......@@ -670,83 +743,73 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
u32 node = msg_orignode(hdr);
u32 port = msg_origport(hdr);
struct tipc_member *m, *pm;
struct tipc_msg *ehdr;
u16 remitted, in_flight;
if (!grp)
return;
if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net))
return;
m = tipc_group_find_member(grp, node, port);
switch (msg_type(hdr)) {
case GRP_JOIN_MSG:
if (!m)
m = tipc_group_create_member(grp, node, port,
MBR_QUARANTINED);
0, MBR_JOINING);
if (!m)
return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr);
m->bc_rcv_nxt = m->bc_syncpt;
m->window += msg_adv_win(hdr);
/* Wait until PUBLISH event is received */
if (m->state == MBR_DISCOVERED) {
m->state = MBR_JOINING;
} else if (m->state == MBR_PUBLISHED) {
m->state = MBR_JOINED;
*usr_wakeup = true;
m->usr_pending = false;
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
ehdr = buf_msg(m->event_msg);
msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
__skb_queue_tail(inputq, m->event_msg);
}
list_del_init(&m->small_win);
/* Wait until PUBLISH event is received if necessary */
if (m->state != MBR_PUBLISHED)
return;
/* Member can be taken into service */
m->state = MBR_JOINED;
tipc_group_open(m, usr_wakeup);
tipc_group_update_member(m, 0);
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
tipc_group_create_event(grp, m, TIPC_PUBLISHED,
m->bc_syncpt, inputq);
return;
case GRP_LEAVE_MSG:
if (!m)
return;
m->bc_syncpt = msg_grp_bc_syncpt(hdr);
list_del_init(&m->list);
list_del_init(&m->small_win);
*usr_wakeup = true;
/* Wait until WITHDRAW event is received */
if (m->state != MBR_LEAVING) {
tipc_group_decr_active(grp, m);
m->state = MBR_LEAVING;
return;
}
/* Otherwise deliver already received WITHDRAW event */
ehdr = buf_msg(m->event_msg);
msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
__skb_queue_tail(inputq, m->event_msg);
tipc_group_open(m, usr_wakeup);
tipc_group_decr_active(grp, m);
m->state = MBR_LEAVING;
tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
m->bc_syncpt, inputq);
return;
case GRP_ADV_MSG:
if (!m)
return;
m->window += msg_adv_win(hdr);
*usr_wakeup = m->usr_pending;
m->usr_pending = false;
list_del_init(&m->small_win);
tipc_group_open(m, usr_wakeup);
return;
case GRP_ACK_MSG:
if (!m)
return;
m->bc_acked = msg_grp_bc_acked(hdr);
if (--grp->bc_ackers)
break;
return;
list_del_init(&m->small_win);
m->group->open = true;
*usr_wakeup = true;
m->usr_pending = false;
tipc_group_update_member(m, 0);
return;
case GRP_RECLAIM_MSG:
if (!m)
return;
*usr_wakeup = m->usr_pending;
m->usr_pending = false;
tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
m->window = ADV_IDLE;
tipc_group_open(m, usr_wakeup);
return;
case GRP_REMIT_MSG:
if (!m || m->state != MBR_RECLAIMING)
......@@ -761,18 +824,14 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
m->advertised = ADV_IDLE + in_flight;
return;
}
/* All messages preceding the REMIT have been read */
if (m->advertised <= remitted) {
m->state = MBR_JOINED;
in_flight = 0;
}
/* ..and the REMIT overtaken by more messages => re-advertise */
/* This should never happen */
if (m->advertised < remitted)
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
pr_warn_ratelimited("Unexpected REMIT msg\n");
m->advertised = ADV_IDLE + in_flight;
/* All messages preceding the REMIT have been read */
m->state = MBR_JOINED;
grp->active_cnt--;
list_del_init(&m->list);
m->advertised = ADV_IDLE;
/* Set oldest pending member to active and advertise */
if (list_empty(&grp->pending))
......@@ -794,11 +853,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
void tipc_group_member_evt(struct tipc_group *grp,
bool *usr_wakeup,
int *sk_rcvbuf,
struct sk_buff *skb,
struct tipc_msg *hdr,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
struct tipc_msg *hdr = buf_msg(skb);
struct tipc_event *evt = (void *)msg_data(hdr);
u32 instance = evt->found_lower;
u32 node = evt->port.node;
......@@ -806,86 +864,59 @@ void tipc_group_member_evt(struct tipc_group *grp,
int event = evt->event;
struct tipc_member *m;
struct net *net;
bool node_up;
u32 self;
if (!grp)
goto drop;
return;
net = grp->net;
self = tipc_own_addr(net);
if (!grp->loopback && node == self && port == grp->portid)
goto drop;
/* Convert message before delivery to user */
msg_set_hdr_sz(hdr, GROUP_H_SIZE);
msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
msg_set_origport(hdr, port);
msg_set_orignode(hdr, node);
msg_set_nametype(hdr, grp->type);
msg_set_grp_evt(hdr, event);
return;
m = tipc_group_find_member(grp, node, port);
if (event == TIPC_PUBLISHED) {
if (!m)
m = tipc_group_create_member(grp, node, port,
MBR_DISCOVERED);
if (!m)
goto drop;
/* Hold back event if JOIN message not yet received */
if (m->state == MBR_DISCOVERED) {
m->event_msg = skb;
m->state = MBR_PUBLISHED;
} else {
msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
__skb_queue_tail(inputq, skb);
m->state = MBR_JOINED;
*usr_wakeup = true;
m->usr_pending = false;
switch (event) {
case TIPC_PUBLISHED:
/* Send and wait for arrival of JOIN message if necessary */
if (!m) {
m = tipc_group_create_member(grp, node, port, instance,
MBR_PUBLISHED);
if (!m)
break;
tipc_group_update_member(m, 0);
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
break;
}
if (m->state != MBR_JOINING)
break;
/* Member can be taken into service */
m->instance = instance;
TIPC_SKB_CB(skb)->orig_member = m->instance;
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
m->state = MBR_JOINED;
tipc_group_open(m, usr_wakeup);
tipc_group_update_member(m, 0);
} else if (event == TIPC_WITHDRAWN) {
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
tipc_group_create_event(grp, m, TIPC_PUBLISHED,
m->bc_syncpt, inputq);
break;
case TIPC_WITHDRAWN:
if (!m)
goto drop;
TIPC_SKB_CB(skb)->orig_member = m->instance;
break;
*usr_wakeup = true;
m->usr_pending = false;
node_up = tipc_node_is_up(net, node);
m->event_msg = NULL;
if (node_up) {
/* Hold back event if a LEAVE msg should be expected */
if (m->state != MBR_LEAVING) {
m->event_msg = skb;
tipc_group_decr_active(grp, m);
m->state = MBR_LEAVING;
} else {
msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
__skb_queue_tail(inputq, skb);
}
} else {
if (m->state != MBR_LEAVING) {
tipc_group_decr_active(grp, m);
m->state = MBR_LEAVING;
msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
} else {
msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
}
__skb_queue_tail(inputq, skb);
}
tipc_group_decr_active(grp, m);
m->state = MBR_LEAVING;
list_del_init(&m->list);
list_del_init(&m->small_win);
tipc_group_open(m, usr_wakeup);
/* Only send event if no LEAVE message can be expected */
if (!tipc_node_is_up(net, node))
tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
m->bc_rcv_nxt, inputq);
break;
default:
break;
}
*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
return;
drop:
kfree_skb(skb);
}
......@@ -44,8 +44,10 @@ struct tipc_msg;
struct tipc_group *tipc_group_create(struct net *net, u32 portid,
struct tipc_group_req *mreq);
void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcv_buf);
void tipc_group_delete(struct net *net, struct tipc_group *grp);
void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port);
void tipc_group_add_member(struct tipc_group *grp, u32 node,
u32 port, u32 instance);
struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);
void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
int *scope);
......@@ -54,7 +56,7 @@ void tipc_group_filter_msg(struct tipc_group *grp,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq);
void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup,
int *sk_rcvbuf, struct sk_buff *skb,
int *sk_rcvbuf, struct tipc_msg *hdr,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq);
void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
......@@ -65,9 +67,9 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack);
bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
int len, struct tipc_member **m);
bool tipc_group_bc_cong(struct tipc_group *grp, int len);
bool tipc_group_is_open(struct tipc_group *grp);
void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
u32 port, struct sk_buff_head *xmitq);
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
void tipc_group_update_member(struct tipc_member *m, int len);
int tipc_group_size(struct tipc_group *grp);
#endif
......@@ -328,7 +328,8 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
TIPC_PUBLISHED, publ->ref,
publ->node, created_subseq);
publ->node, publ->scope,
created_subseq);
}
return publ;
}
......@@ -398,19 +399,21 @@ static struct publication *tipc_nameseq_remove_publ(struct net *net,
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
TIPC_WITHDRAWN, publ->ref,
publ->node, removed_subseq);
publ->node, publ->scope,
removed_subseq);
}
return publ;
}
/**
* tipc_nameseq_subscribe - attach a subscription, and issue
* the prescribed number of events if there is any sub-
* tipc_nameseq_subscribe - attach a subscription, and optionally
* issue the prescribed number of events if there is any sub-
* sequence overlapping with the requested sequence
*/
static void tipc_nameseq_subscribe(struct name_seq *nseq,
struct tipc_subscription *s)
struct tipc_subscription *s,
bool status)
{
struct sub_seq *sseq = nseq->sseqs;
struct tipc_name_seq ns;
......@@ -420,7 +423,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
tipc_subscrp_get(s);
list_add(&s->nameseq_list, &nseq->subscriptions);
if (!sseq)
if (!status || !sseq)
return;
while (sseq != &nseq->sseqs[nseq->first_free]) {
......@@ -434,6 +437,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
sseq->upper,
TIPC_PUBLISHED,
crs->ref, crs->node,
crs->scope,
must_report);
must_report = 0;
}
......@@ -597,7 +601,7 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
return ref;
}
bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
struct list_head *dsts, int *dstcnt, u32 exclude,
bool all)
{
......@@ -607,9 +611,6 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
struct name_seq *seq;
struct sub_seq *sseq;
if (!tipc_in_scope(domain, self))
return false;
*dstcnt = 0;
rcu_read_lock();
seq = nametbl_find_seq(net, type);
......@@ -620,7 +621,7 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
if (likely(sseq)) {
info = sseq->info;
list_for_each_entry(publ, &info->zone_list, zone_list) {
if (!tipc_in_scope(domain, publ->node))
if (publ->scope != scope)
continue;
if (publ->ref == exclude && publ->node == self)
continue;
......@@ -638,13 +639,14 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
return !list_empty(dsts);
}
int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
u32 limit, struct list_head *dports)
int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
u32 scope, bool exact, struct list_head *dports)
{
struct name_seq *seq;
struct sub_seq *sseq;
struct sub_seq *sseq_stop;
struct name_info *info;
struct publication *p;
struct name_seq *seq;
struct sub_seq *sseq;
int res = 0;
rcu_read_lock();
......@@ -656,15 +658,12 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
sseq_stop = seq->sseqs + seq->first_free;
for (; sseq != sseq_stop; sseq++) {
struct publication *publ;
if (sseq->lower > upper)
break;
info = sseq->info;
list_for_each_entry(publ, &info->node_list, node_list) {
if (publ->scope <= limit)
tipc_dest_push(dports, 0, publ->ref);
list_for_each_entry(p, &info->node_list, node_list) {
if (p->scope == scope || (!exact && p->scope < scope))
tipc_dest_push(dports, 0, p->ref);
}
if (info->cluster_list_size != info->node_list_size)
......@@ -681,7 +680,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
* - Determines if any node local ports overlap
*/
void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
u32 upper, u32 domain,
u32 upper, u32 scope,
struct tipc_nlist *nodes)
{
struct sub_seq *sseq, *stop;
......@@ -700,7 +699,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
for (; sseq != stop && sseq->lower <= upper; sseq++) {
info = sseq->info;
list_for_each_entry(publ, &info->zone_list, zone_list) {
if (tipc_in_scope(domain, publ->node))
if (publ->scope == scope)
tipc_nlist_add(nodes, publ->node);
}
}
......@@ -712,7 +711,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
/* tipc_nametbl_build_group - build list of communication group members
*/
void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
u32 type, u32 domain)
u32 type, u32 scope)
{
struct sub_seq *sseq, *stop;
struct name_info *info;
......@@ -730,9 +729,9 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
for (; sseq != stop; sseq++) {
info = sseq->info;
list_for_each_entry(p, &info->zone_list, zone_list) {
if (!tipc_in_scope(domain, p->node))
if (p->scope != scope)
continue;
tipc_group_add_member(grp, p->node, p->ref);
tipc_group_add_member(grp, p->node, p->ref, p->lower);
}
}
spin_unlock_bh(&seq->lock);
......@@ -811,7 +810,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
/**
* tipc_nametbl_subscribe - add a subscription object to the name table
*/
void tipc_nametbl_subscribe(struct tipc_subscription *s)
void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
{
struct tipc_net *tn = net_generic(s->net, tipc_net_id);
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
......@@ -825,7 +824,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s)
seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
if (seq) {
spin_lock_bh(&seq->lock);
tipc_nameseq_subscribe(seq, s);
tipc_nameseq_subscribe(seq, s, status);
spin_unlock_bh(&seq->lock);
} else {
tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns);
......
......@@ -100,8 +100,8 @@ struct name_table {
int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
u32 limit, struct list_head *dports);
int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
u32 scope, bool exact, struct list_head *dports);
void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
u32 type, u32 domain);
void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
......@@ -121,7 +121,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
u32 lower, u32 node, u32 ref,
u32 key);
void tipc_nametbl_subscribe(struct tipc_subscription *s);
void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status);
void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
int tipc_nametbl_init(struct net *net);
void tipc_nametbl_stop(struct net *net);
......
......@@ -489,8 +489,8 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
}
}
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
u32 lower, u32 upper, int *conid)
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid)
{
struct tipc_subscriber *scbr;
struct tipc_subscr sub;
......@@ -501,7 +501,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
sub.seq.lower = lower;
sub.seq.upper = upper;
sub.timeout = TIPC_WAIT_FOREVER;
sub.filter = TIPC_SUB_PORTS;
sub.filter = filter;
*(u32 *)&sub.usr_handle = port;
con = tipc_alloc_conn(tipc_topsrv(net));
......
......@@ -41,6 +41,9 @@
#include <net/net_namespace.h>
#define TIPC_SERVER_NAME_LEN 32
#define TIPC_SUB_CLUSTER_SCOPE 0x20
#define TIPC_SUB_NODE_SCOPE 0x40
#define TIPC_SUB_NO_STATUS 0x80
/**
* struct tipc_server - TIPC server structure
......@@ -83,8 +86,8 @@ struct tipc_server {
int tipc_conn_sendmsg(struct tipc_server *s, int conid,
struct sockaddr_tipc *addr, void *data, size_t len);
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
u32 lower, u32 upper, int *conid);
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid);
void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
/**
......
......@@ -715,7 +715,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_group *grp = tsk->group;
struct tipc_group *grp;
u32 revents = 0;
sock_poll_wait(file, sk_sleep(sk), wait);
......@@ -736,9 +736,9 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
revents |= POLLIN | POLLRDNORM;
break;
case TIPC_OPEN:
if (!grp || tipc_group_size(grp))
if (!tsk->cong_link_cnt)
revents |= POLLOUT;
grp = tsk->group;
if ((!grp || tipc_group_is_open(grp)) && !tsk->cong_link_cnt)
revents |= POLLOUT;
if (!tipc_sk_type_connectionless(sk))
break;
if (skb_queue_empty(&sk->sk_receive_queue))
......@@ -928,21 +928,22 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
struct list_head *cong_links = &tsk->cong_links;
int blks = tsk_blocks(GROUP_H_SIZE + dlen);
struct tipc_group *grp = tsk->group;
struct tipc_msg *hdr = &tsk->phdr;
struct tipc_member *first = NULL;
struct tipc_member *mbr = NULL;
struct net *net = sock_net(sk);
u32 node, port, exclude;
u32 type, inst, domain;
struct list_head dsts;
u32 type, inst, scope;
int lookups = 0;
int dstcnt, rc;
bool cong;
INIT_LIST_HEAD(&dsts);
type = dest->addr.name.name.type;
type = msg_nametype(hdr);
inst = dest->addr.name.name.instance;
domain = addr_domain(net, dest->scope);
scope = msg_lookup_scope(hdr);
exclude = tipc_group_exclude(grp);
while (++lookups < 4) {
......@@ -950,7 +951,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
/* Look for a non-congested destination member, if any */
while (1) {
if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts,
if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
&dstcnt, exclude, false))
return -EHOSTUNREACH;
tipc_dest_pop(&dsts, &node, &port);
......@@ -1079,22 +1080,23 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
{
struct sock *sk = sock->sk;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct tipc_name_seq *seq = &dest->addr.nameseq;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_group *grp = tsk->group;
struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
u32 domain, exclude, dstcnt;
u32 type, inst, scope, exclude;
struct list_head dsts;
u32 dstcnt;
INIT_LIST_HEAD(&dsts);
if (seq->lower != seq->upper)
return -ENOTSUPP;
domain = addr_domain(net, dest->scope);
type = msg_nametype(hdr);
inst = dest->addr.name.name.instance;
scope = msg_lookup_scope(hdr);
exclude = tipc_group_exclude(grp);
if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain,
&dsts, &dstcnt, exclude, true))
if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
&dstcnt, exclude, true))
return -EHOSTUNREACH;
if (dstcnt == 1) {
......@@ -1116,24 +1118,29 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff_head *inputq)
{
u32 scope = TIPC_CLUSTER_SCOPE;
u32 self = tipc_own_addr(net);
u32 type, lower, upper, scope;
struct sk_buff *skb, *_skb;
u32 lower = 0, upper = ~0;
struct sk_buff_head tmpq;
u32 portid, oport, onode;
struct sk_buff_head tmpq;
struct list_head dports;
struct tipc_msg *msg;
int user, mtyp, hsz;
struct tipc_msg *hdr;
int user, mtyp, hlen;
bool exact;
__skb_queue_head_init(&tmpq);
INIT_LIST_HEAD(&dports);
skb = tipc_skb_peek(arrvq, &inputq->lock);
for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
msg = buf_msg(skb);
user = msg_user(msg);
mtyp = msg_type(msg);
hdr = buf_msg(skb);
user = msg_user(hdr);
mtyp = msg_type(hdr);
hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
oport = msg_origport(hdr);
onode = msg_orignode(hdr);
type = msg_nametype(hdr);
if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
spin_lock_bh(&inputq->lock);
if (skb_peek(arrvq) == skb) {
......@@ -1144,21 +1151,31 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
spin_unlock_bh(&inputq->lock);
continue;
}
hsz = skb_headroom(skb) + msg_hdr_sz(msg);
oport = msg_origport(msg);
onode = msg_orignode(msg);
if (onode == self)
scope = TIPC_NODE_SCOPE;
/* Create destination port list and message clones: */
if (!msg_in_group(msg)) {
lower = msg_namelower(msg);
upper = msg_nameupper(msg);
/* Group messages require exact scope match */
if (msg_in_group(hdr)) {
lower = 0;
upper = ~0;
scope = msg_lookup_scope(hdr);
exact = true;
} else {
/* TIPC_NODE_SCOPE means "any scope" in this context */
if (onode == self)
scope = TIPC_NODE_SCOPE;
else
scope = TIPC_CLUSTER_SCOPE;
exact = false;
lower = msg_namelower(hdr);
upper = msg_nameupper(hdr);
}
tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper,
scope, &dports);
/* Create destination port list: */
tipc_nametbl_mc_lookup(net, type, lower, upper,
scope, exact, &dports);
/* Clone message per destination */
while (tipc_dest_pop(&dports, NULL, &portid)) {
_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
_skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
if (_skb) {
msg_set_destport(buf_msg(_skb), portid);
__skb_queue_tail(&tmpq, _skb);
......@@ -1933,8 +1950,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
break;
case TOP_SRV:
tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
skb, inputq, xmitq);
skb = NULL;
hdr, inputq, xmitq);
break;
default:
break;
......@@ -2732,7 +2748,6 @@ void tipc_sk_rht_destroy(struct net *net)
static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
{
struct net *net = sock_net(&tsk->sk);
u32 domain = addr_domain(net, mreq->scope);
struct tipc_group *grp = tsk->group;
struct tipc_msg *hdr = &tsk->phdr;
struct tipc_name_seq seq;
......@@ -2740,6 +2755,8 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
if (mreq->type < TIPC_RESERVED_TYPES)
return -EACCES;
if (mreq->scope > TIPC_NODE_SCOPE)
return -EINVAL;
if (grp)
return -EACCES;
grp = tipc_group_create(net, tsk->portid, mreq);
......@@ -2752,16 +2769,16 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
seq.type = mreq->type;
seq.lower = mreq->instance;
seq.upper = seq.lower;
tipc_nametbl_build_group(net, grp, mreq->type, domain);
tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope);
rc = tipc_sk_publish(tsk, mreq->scope, &seq);
if (rc) {
tipc_group_delete(net, grp);
tsk->group = NULL;
}
/* Eliminate any risk that a broadcast overtakes the sent JOIN */
/* Eliminate any risk that a broadcast overtakes sent JOINs */
tsk->mc_method.rcast = true;
tsk->mc_method.mandatory = true;
tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);
return rc;
}
......
......@@ -118,15 +118,19 @@ void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper, u32 event, u32 port_ref,
u32 node, int must)
u32 node, u32 scope, int must)
{
u32 filter = htohl(sub->evt.s.filter, sub->swap);
struct tipc_name_seq seq;
tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq);
if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper))
return;
if (!must &&
!(htohl(sub->evt.s.filter, sub->swap) & TIPC_SUB_PORTS))
if (!must && !(filter & TIPC_SUB_PORTS))
return;
if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE)
return;
if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
return;
tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
......@@ -286,7 +290,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
}
static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
struct tipc_subscriber *subscriber, int swap)
struct tipc_subscriber *subscriber, int swap,
bool status)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_subscription *sub = NULL;
......@@ -299,7 +304,7 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
spin_lock_bh(&subscriber->lock);
list_add(&sub->subscrp_list, &subscriber->subscrp_list);
sub->subscriber = subscriber;
tipc_nametbl_subscribe(sub);
tipc_nametbl_subscribe(sub, status);
tipc_subscrb_get(subscriber);
spin_unlock_bh(&subscriber->lock);
......@@ -323,6 +328,7 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,
{
struct tipc_subscriber *subscriber = usr_data;
struct tipc_subscr *s = (struct tipc_subscr *)buf;
bool status;
int swap;
/* Determine subscriber's endianness */
......@@ -334,8 +340,8 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,
s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
return tipc_subscrp_cancel(s, subscriber);
}
tipc_subscrp_subscribe(net, s, subscriber, swap);
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
tipc_subscrp_subscribe(net, s, subscriber, swap, status);
}
/* Handle one request to establish a new subscriber */
......
......@@ -71,7 +71,7 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
u32 found_upper);
void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
u32 found_lower, u32 found_upper, u32 event,
u32 port_ref, u32 node, int must);
u32 port_ref, u32 node, u32 scope, int must);
void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
struct tipc_name_seq *out);
u32 tipc_subscrp_convert_seq_type(u32 type, int swap);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment