Commit 5b8dddb6 authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: introduce group multicast messaging

The previously introduced message transport to all group members is
based on the tipc multicast service, but is logically a broadcast
service within the group, and that is what we call it.

We now add functionality for sending messages to all group members
having a certain identity. Correspondingly, we call this feature 'group
multicast'. The service is using unicast when only one destination is
found, otherwise it will use the bearer broadcast service to transfer
the messages. In the latter case, the receiving members filter arriving
messages by looking at the intended destination instance. If there is
no match, the message will be dropped, while still being considered
received and read as seen by the flow control mechanism.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent ee106d7f
...@@ -413,10 +413,22 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, ...@@ -413,10 +413,22 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
if (!tipc_group_is_receiver(m)) if (!tipc_group_is_receiver(m))
goto drop; goto drop;
m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
/* Drop multicast here if not for this member */
if (mtyp == TIPC_GRP_MCAST_MSG) {
if (msg_nameinst(hdr) != grp->instance) {
m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
tipc_group_update_rcv_win(grp, msg_blocks(hdr),
node, port, xmitq);
kfree_skb(skb);
return;
}
}
TIPC_SKB_CB(skb)->orig_member = m->instance; TIPC_SKB_CB(skb)->orig_member = m->instance;
__skb_queue_tail(inputq, skb); __skb_queue_tail(inputq, skb);
m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
return; return;
drop: drop:
kfree_skb(skb); kfree_skb(skb);
......
...@@ -67,7 +67,8 @@ struct plist; ...@@ -67,7 +67,8 @@ struct plist;
#define TIPC_DIRECT_MSG 3 #define TIPC_DIRECT_MSG 3
#define TIPC_GRP_MEMBER_EVT 4 #define TIPC_GRP_MEMBER_EVT 4
#define TIPC_GRP_BCAST_MSG 5 #define TIPC_GRP_BCAST_MSG 5
#define TIPC_GRP_UCAST_MSG 6 #define TIPC_GRP_MCAST_MSG 6
#define TIPC_GRP_UCAST_MSG 7
/* /*
* Internal message users * Internal message users
...@@ -195,6 +196,11 @@ static inline u32 msg_size(struct tipc_msg *m) ...@@ -195,6 +196,11 @@ static inline u32 msg_size(struct tipc_msg *m)
return msg_bits(m, 0, 0, 0x1ffff); return msg_bits(m, 0, 0, 0x1ffff);
} }
static inline u32 msg_blocks(struct tipc_msg *m)
{
return (msg_size(m) / 1024) + 1;
}
static inline u32 msg_data_sz(struct tipc_msg *m) static inline u32 msg_data_sz(struct tipc_msg *m)
{ {
return msg_size(m) - msg_hdr_sz(m); return msg_size(m) - msg_hdr_sz(m);
...@@ -279,7 +285,8 @@ static inline u32 msg_mcast(struct tipc_msg *m) ...@@ -279,7 +285,8 @@ static inline u32 msg_mcast(struct tipc_msg *m)
{ {
int mtyp = msg_type(m); int mtyp = msg_type(m);
return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG)); return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG) ||
(mtyp == TIPC_GRP_MCAST_MSG));
} }
static inline u32 msg_connected(struct tipc_msg *m) static inline u32 msg_connected(struct tipc_msg *m)
......
...@@ -999,6 +999,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m, ...@@ -999,6 +999,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
int dlen, long timeout) int dlen, long timeout)
{ {
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
...@@ -1021,11 +1022,16 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, ...@@ -1021,11 +1022,16 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
return rc; return rc;
/* Complete message header */ /* Complete message header */
msg_set_type(hdr, TIPC_GRP_BCAST_MSG); if (dest) {
msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
msg_set_nameinst(hdr, dest->addr.name.name.instance);
} else {
msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
msg_set_nameinst(hdr, 0);
}
msg_set_hdr_sz(hdr, GROUP_H_SIZE); msg_set_hdr_sz(hdr, GROUP_H_SIZE);
msg_set_destport(hdr, 0); msg_set_destport(hdr, 0);
msg_set_destnode(hdr, 0); msg_set_destnode(hdr, 0);
msg_set_nameinst(hdr, 0);
msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
/* Build message as chain of buffers */ /* Build message as chain of buffers */
...@@ -1045,6 +1051,48 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, ...@@ -1045,6 +1051,48 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
return dlen; return dlen;
} }
/**
* tipc_send_group_mcast - send message to all members with given identity
* @sock: socket structure
* @m: message to send
* @dlen: total length of message data
* @timeout: timeout to wait for wakeup
*
* Called from function tipc_sendmsg(), which has done all sanity checks
* Returns the number of bytes sent on success, or errno
*/
static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
int dlen, long timeout)
{
struct sock *sk = sock->sk;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
struct tipc_name_seq *seq = &dest->addr.nameseq;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_group *grp = tsk->group;
struct net *net = sock_net(sk);
u32 domain, exclude, dstcnt;
struct list_head dsts;
INIT_LIST_HEAD(&dsts);
if (seq->lower != seq->upper)
return -ENOTSUPP;
domain = addr_domain(net, dest->scope);
exclude = tipc_group_exclude(grp);
if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain,
&dsts, &dstcnt, exclude, true))
return -EHOSTUNREACH;
if (dstcnt == 1) {
tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref);
return tipc_send_group_unicast(sock, m, dlen, timeout);
}
tipc_dest_list_purge(&dsts);
return tipc_send_group_bcast(sock, m, dlen, timeout);
}
/** /**
* tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
* @arrvq: queue with arriving messages, to be cloned after destination lookup * @arrvq: queue with arriving messages, to be cloned after destination lookup
...@@ -1213,6 +1261,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) ...@@ -1213,6 +1261,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
return tipc_send_group_anycast(sock, m, dlen, timeout); return tipc_send_group_anycast(sock, m, dlen, timeout);
if (dest->addrtype == TIPC_ADDR_ID) if (dest->addrtype == TIPC_ADDR_ID)
return tipc_send_group_unicast(sock, m, dlen, timeout); return tipc_send_group_unicast(sock, m, dlen, timeout);
if (dest->addrtype == TIPC_ADDR_MCAST)
return tipc_send_group_mcast(sock, m, dlen, timeout);
return -EINVAL; return -EINVAL;
} }
...@@ -2022,8 +2072,6 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, ...@@ -2022,8 +2072,6 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
if (unlikely(!msg_isdata(hdr))) if (unlikely(!msg_isdata(hdr)))
tipc_sk_proto_rcv(sk, &inputq, xmitq); tipc_sk_proto_rcv(sk, &inputq, xmitq);
else if (unlikely(msg_type(hdr) > TIPC_GRP_UCAST_MSG))
return kfree_skb(skb);
if (unlikely(grp)) if (unlikely(grp))
tipc_group_filter_msg(grp, &inputq, xmitq); tipc_group_filter_msg(grp, &inputq, xmitq);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment