Commit ee40d4bb authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-next'

Jon Maloy says:

====================
tipc: multicast and internal users to new send functions

We move the remaining data transmit users: multicast, name table
distributor, and link internal protocols to use the new data
transmission framework introduced in a previous commit series
("tipc: new unicast transmission code").

Finally, we remove the code obsoleted by the new functions.

v2: - Fixed a braindead, but harmless return sequence in commit #3, as
      reported by David Miller.
    - Rebased series to 3.16.0-rc5+
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents a9f559c3 6f92ee54
/*
* net/tipc/bcast.c: TIPC broadcast code
*
* Copyright (c) 2004-2006, Ericsson AB
* Copyright (c) 2004-2006, 2014, Ericsson AB
* Copyright (c) 2004, Intel Corporation.
* Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
......@@ -38,6 +38,8 @@
#include "core.h"
#include "link.h"
#include "port.h"
#include "socket.h"
#include "msg.h"
#include "bcast.h"
#include "name_distr.h"
......@@ -138,6 +140,11 @@ static void tipc_bclink_unlock(void)
tipc_link_reset_all(node);
}
uint tipc_bclink_get_mtu(void)
{
return MAX_PKT_DEFAULT_MCAST;
}
void tipc_bclink_set_flags(unsigned int flags)
{
bclink->flags |= flags;
......@@ -382,30 +389,50 @@ static void bclink_peek_nack(struct tipc_msg *msg)
tipc_node_unlock(n_ptr);
}
/*
* tipc_bclink_xmit - broadcast a packet to all nodes in cluster
/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
* and to identified node local sockets
* @buf: chain of buffers containing message
* Consumes the buffer chain, except when returning -ELINKCONG
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_bclink_xmit(struct sk_buff *buf)
{
int res;
int rc = 0;
int bc = 0;
struct sk_buff *clbuf;
tipc_bclink_lock();
if (!bclink->bcast_nodes.count) {
res = msg_data_sz(buf_msg(buf));
kfree_skb(buf);
goto exit;
/* Prepare clone of message for local node */
clbuf = tipc_msg_reassemble(buf);
if (unlikely(!clbuf)) {
kfree_skb_list(buf);
return -EHOSTUNREACH;
}
res = __tipc_link_xmit(bcl, buf);
if (likely(res >= 0)) {
bclink_set_last_sent();
bcl->stats.queue_sz_counts++;
bcl->stats.accu_queue_sz += bcl->out_queue_size;
/* Broadcast to all other nodes */
if (likely(bclink)) {
tipc_bclink_lock();
if (likely(bclink->bcast_nodes.count)) {
rc = __tipc_link_xmit(bcl, buf);
if (likely(!rc)) {
bclink_set_last_sent();
bcl->stats.queue_sz_counts++;
bcl->stats.accu_queue_sz += bcl->out_queue_size;
}
bc = 1;
}
tipc_bclink_unlock();
}
exit:
tipc_bclink_unlock();
return res;
if (unlikely(!bc))
kfree_skb_list(buf);
/* Deliver message clone */
if (likely(!rc))
tipc_sk_mcast_rcv(clbuf);
else
kfree_skb(clbuf);
return rc;
}
/**
......@@ -443,7 +470,7 @@ void tipc_bclink_rcv(struct sk_buff *buf)
struct tipc_node *node;
u32 next_in;
u32 seqno;
int deferred;
int deferred = 0;
/* Screen out unwanted broadcast messages */
......@@ -494,7 +521,7 @@ void tipc_bclink_rcv(struct sk_buff *buf)
tipc_bclink_unlock();
tipc_node_unlock(node);
if (likely(msg_mcast(msg)))
tipc_port_mcast_rcv(buf, NULL);
tipc_sk_mcast_rcv(buf);
else
kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
......@@ -573,8 +600,7 @@ void tipc_bclink_rcv(struct sk_buff *buf)
node->bclink.deferred_size += deferred;
bclink_update_last_sent(node, seqno);
buf = NULL;
} else
deferred = 0;
}
tipc_bclink_lock();
......@@ -611,6 +637,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
struct tipc_media_addr *unused2)
{
int bp_index;
struct tipc_msg *msg = buf_msg(buf);
/* Prepare broadcast link message for reliable transmission,
* if first time trying to send it;
......@@ -618,10 +645,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
* since they are sent in an unreliable manner and don't need it
*/
if (likely(!msg_non_seq(buf_msg(buf)))) {
struct tipc_msg *msg;
bcbuf_set_acks(buf, bclink->bcast_nodes.count);
msg = buf_msg(buf);
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tipc_net_id);
bcl->stats.sent_info++;
......@@ -638,12 +662,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
struct tipc_bearer *b = p;
struct tipc_bearer *bp[2] = {p, s};
struct tipc_bearer *b = bp[msg_link_selector(msg)];
struct sk_buff *tbuf;
if (!p)
break; /* No more bearers to try */
if (!b)
b = p;
tipc_nmap_diff(&bcbearer->remains, &b->nodes,
&bcbearer->remains_new);
if (bcbearer->remains_new.count == bcbearer->remains.count)
......@@ -660,13 +686,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
kfree_skb(tbuf); /* Bearer keeps a clone */
}
/* Swap bearers for next packet */
if (s) {
bcbearer->bpairs[bp_index].primary = s;
bcbearer->bpairs[bp_index].secondary = p;
}
if (bcbearer->remains_new.count == 0)
break; /* All targets reached */
......
/*
* net/tipc/bcast.h: Include file for TIPC broadcast code
*
* Copyright (c) 2003-2006, Ericsson AB
* Copyright (c) 2003-2006, 2014, Ericsson AB
* Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
......@@ -89,7 +89,6 @@ void tipc_bclink_add_node(u32 addr);
void tipc_bclink_remove_node(u32 addr);
struct tipc_node *tipc_bclink_retransmit_to(void);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
int tipc_bclink_xmit(struct sk_buff *buf);
void tipc_bclink_rcv(struct sk_buff *buf);
u32 tipc_bclink_get_last_sent(void);
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
......@@ -98,5 +97,7 @@ int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
int tipc_bclink_reset_stats(void);
int tipc_bclink_set_queue_limits(u32 limit);
void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
uint tipc_bclink_get_mtu(void);
int tipc_bclink_xmit(struct sk_buff *buf);
#endif
......@@ -85,7 +85,6 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
static void link_state_event(struct tipc_link *l_ptr, u32 event);
static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
static void tipc_link_sync_xmit(struct tipc_link *l);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
......@@ -679,180 +678,6 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
}
}
/*
* link_bundle_buf(): Append contents of a buffer to
* the tail of an existing one.
*/
static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler,
struct sk_buff *buf)
{
struct tipc_msg *bundler_msg = buf_msg(bundler);
struct tipc_msg *msg = buf_msg(buf);
u32 size = msg_size(msg);
u32 bundle_size = msg_size(bundler_msg);
u32 to_pos = align(bundle_size);
u32 pad = to_pos - bundle_size;
if (msg_user(bundler_msg) != MSG_BUNDLER)
return 0;
if (msg_type(bundler_msg) != OPEN_MSG)
return 0;
if (skb_tailroom(bundler) < (pad + size))
return 0;
if (l_ptr->max_pkt < (to_pos + size))
return 0;
skb_put(bundler, pad + size);
skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
msg_set_size(bundler_msg, to_pos + size);
msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
kfree_skb(buf);
l_ptr->stats.sent_bundled++;
return 1;
}
static void link_add_to_outqueue(struct tipc_link *l_ptr,
struct sk_buff *buf,
struct tipc_msg *msg)
{
u32 ack = mod(l_ptr->next_in_no - 1);
u32 seqno = mod(l_ptr->next_out_no++);
msg_set_word(msg, 2, ((ack << 16) | seqno));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
buf->next = NULL;
if (l_ptr->first_out) {
l_ptr->last_out->next = buf;
l_ptr->last_out = buf;
} else
l_ptr->first_out = l_ptr->last_out = buf;
l_ptr->out_queue_size++;
if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
}
static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
struct sk_buff *buf_chain,
u32 long_msgno)
{
struct sk_buff *buf;
struct tipc_msg *msg;
if (!l_ptr->next_out)
l_ptr->next_out = buf_chain;
while (buf_chain) {
buf = buf_chain;
buf_chain = buf_chain->next;
msg = buf_msg(buf);
msg_set_long_msgno(msg, long_msgno);
link_add_to_outqueue(l_ptr, buf, msg);
}
}
/*
* tipc_link_xmit() is the 'full path' for messages, called from
* inside TIPC when the 'fast path' in tipc_send_xmit
* has failed, and from link_send()
*/
int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
u32 size = msg_size(msg);
u32 dsz = msg_data_sz(msg);
u32 queue_size = l_ptr->out_queue_size;
u32 imp = tipc_msg_tot_importance(msg);
u32 queue_limit = l_ptr->queue_limit[imp];
u32 max_packet = l_ptr->max_pkt;
/* Match msg importance against queue limits: */
if (unlikely(queue_size >= queue_limit)) {
if (imp <= TIPC_CRITICAL_IMPORTANCE) {
link_schedule_port(l_ptr, msg_origport(msg), size);
kfree_skb(buf);
return -ELINKCONG;
}
kfree_skb(buf);
if (imp > CONN_MANAGER) {
pr_warn("%s<%s>, send queue full", link_rst_msg,
l_ptr->name);
tipc_link_reset(l_ptr);
}
return dsz;
}
/* Fragmentation needed ? */
if (size > max_packet)
return tipc_link_frag_xmit(l_ptr, buf);
/* Packet can be queued or sent. */
if (likely(!link_congested(l_ptr))) {
link_add_to_outqueue(l_ptr, buf, msg);
tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
l_ptr->unacked_window = 0;
return dsz;
}
/* Congestion: can message be bundled ? */
if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
(msg_user(msg) != MSG_FRAGMENTER)) {
/* Try adding message to an existing bundle */
if (l_ptr->next_out &&
link_bundle_buf(l_ptr, l_ptr->last_out, buf))
return dsz;
/* Try creating a new bundle */
if (size <= max_packet * 2 / 3) {
struct sk_buff *bundler = tipc_buf_acquire(max_packet);
struct tipc_msg bundler_hdr;
if (bundler) {
tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
INT_H_SIZE, l_ptr->addr);
skb_copy_to_linear_data(bundler, &bundler_hdr,
INT_H_SIZE);
skb_trim(bundler, INT_H_SIZE);
link_bundle_buf(l_ptr, bundler, buf);
buf = bundler;
msg = buf_msg(buf);
l_ptr->stats.sent_bundles++;
}
}
}
if (!l_ptr->next_out)
l_ptr->next_out = buf;
link_add_to_outqueue(l_ptr, buf, msg);
return dsz;
}
/*
* tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use
* has not been selected yet, and the the owner node is not locked
* Called by TIPC internal users, e.g. the name distributor
*/
int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
{
struct tipc_link *l_ptr;
struct tipc_node *n_ptr;
int res = -ELINKCONG;
n_ptr = tipc_node_find(dest);
if (n_ptr) {
tipc_node_lock(n_ptr);
l_ptr = n_ptr->active_links[selector & 1];
if (l_ptr)
res = __tipc_link_xmit(l_ptr, buf);
else
kfree_skb(buf);
tipc_node_unlock(n_ptr);
} else {
kfree_skb(buf);
}
return res;
}
/* tipc_link_cong: determine return value and how to treat the
* sent buffer during link congestion.
* - For plain, errorless user data messages we keep the buffer and
......@@ -881,7 +706,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
}
/**
* __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked
* __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
* @link: link to use
* @buf: chain of buffers containing message
* Consumes the buffer chain, except when returning -ELINKCONG
......@@ -890,7 +715,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
* Only the socket functions tipc_send_stream() and tipc_send_packet() need
* to act on the return value, since they may need to do more send attempts.
*/
int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
uint psz = msg_size(msg);
......@@ -958,7 +783,7 @@ int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
}
/**
* tipc_link_xmit2() is the general link level function for message sending
* tipc_link_xmit() is the general link level function for message sending
* @buf: chain of buffers containing message
* @dsz: amount of user data to be sent
* @dnode: address of destination node
......@@ -966,7 +791,7 @@ int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
* Consumes the buffer chain, except when returning -ELINKCONG
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
{
struct tipc_link *link = NULL;
struct tipc_node *node;
......@@ -977,7 +802,7 @@ int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
tipc_node_lock(node);
link = node->active_links[selector & 1];
if (link)
rc = __tipc_link_xmit2(link, buf);
rc = __tipc_link_xmit(link, buf);
tipc_node_unlock(node);
}
......@@ -999,7 +824,7 @@ int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
*
* Called with node locked
*/
static void tipc_link_sync_xmit(struct tipc_link *l)
static void tipc_link_sync_xmit(struct tipc_link *link)
{
struct sk_buff *buf;
struct tipc_msg *msg;
......@@ -1009,10 +834,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l)
return;
msg = buf_msg(buf);
tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
msg_set_last_bcast(msg, l->owner->bclink.acked);
link_add_chain_to_outqueue(l, buf, 0);
tipc_link_push_queue(l);
tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
msg_set_last_bcast(msg, link->owner->bclink.acked);
__tipc_link_xmit(link, buf);
}
/*
......@@ -1032,47 +856,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
kfree_skb(buf);
}
/*
* tipc_link_names_xmit - send name table entries to new neighbor
*
* Send routine for bulk delivery of name table messages when contact
* with a new neighbor occurs. No link congestion checking is performed
* because name table messages *must* be delivered. The messages must be
* small enough not to require fragmentation.
* Called without any locks held.
*/
void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
{
struct tipc_node *n_ptr;
struct tipc_link *l_ptr;
struct sk_buff *buf;
struct sk_buff *temp_buf;
if (list_empty(message_list))
return;
n_ptr = tipc_node_find(dest);
if (n_ptr) {
tipc_node_lock(n_ptr);
l_ptr = n_ptr->active_links[0];
if (l_ptr) {
/* convert circular list to linear list */
((struct sk_buff *)message_list->prev)->next = NULL;
link_add_chain_to_outqueue(l_ptr,
(struct sk_buff *)message_list->next, 0);
tipc_link_push_queue(l_ptr);
INIT_LIST_HEAD(message_list);
}
tipc_node_unlock(n_ptr);
}
/* discard the messages if they couldn't be sent */
list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
list_del((struct list_head *)buf);
kfree_skb(buf);
}
}
/*
* tipc_link_push_packet: Push one unsent packet to the media
*/
......@@ -2165,78 +1948,6 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
kfree_skb(buf);
}
/*
* Fragmentation/defragmentation:
*/
/*
* tipc_link_frag_xmit: Entry for buffers needing fragmentation.
* The buffer is complete, inclusive total message length.
* Returns user data length.
*/
static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
{
struct sk_buff *buf_chain = NULL;
struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
struct tipc_msg *inmsg = buf_msg(buf);
struct tipc_msg fragm_hdr;
u32 insize = msg_size(inmsg);
u32 dsz = msg_data_sz(inmsg);
unchar *crs = buf->data;
u32 rest = insize;
u32 pack_sz = l_ptr->max_pkt;
u32 fragm_sz = pack_sz - INT_H_SIZE;
u32 fragm_no = 0;
u32 destaddr;
if (msg_short(inmsg))
destaddr = l_ptr->addr;
else
destaddr = msg_destnode(inmsg);
/* Prepare reusable fragment header: */
tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
INT_H_SIZE, destaddr);
/* Chop up message: */
while (rest > 0) {
struct sk_buff *fragm;
if (rest <= fragm_sz) {
fragm_sz = rest;
msg_set_type(&fragm_hdr, LAST_FRAGMENT);
}
fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
if (fragm == NULL) {
kfree_skb(buf);
kfree_skb_list(buf_chain);
return -ENOMEM;
}
msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
fragm_no++;
msg_set_fragm_no(&fragm_hdr, fragm_no);
skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
fragm_sz);
buf_chain_tail->next = fragm;
buf_chain_tail = fragm;
rest -= fragm_sz;
crs += fragm_sz;
msg_set_type(&fragm_hdr, FRAGMENT);
}
kfree_skb(buf);
/* Append chain of fragments to send queue & send them */
l_ptr->long_msg_seq_no++;
link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
l_ptr->stats.sent_fragments += fragm_no;
l_ptr->stats.sent_fragmented++;
tipc_link_push_queue(l_ptr);
return dsz;
}
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
{
if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
......
......@@ -227,15 +227,8 @@ void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
void tipc_link_reset_list(unsigned int bearer_id);
int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf);
u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
struct iovec const *msg_sect,
unsigned int len, u32 destnode);
void tipc_link_bundle_rcv(struct sk_buff *buf);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
......
......@@ -60,41 +60,6 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
msg_set_destnode(m, destnode);
}
/**
* tipc_msg_build - create message using specified header and data
*
* Note: Caller must not hold any locks in case copy_from_user() is interrupted!
*
* Returns message data size or errno
*/
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
unsigned int len, int max_size, struct sk_buff **buf)
{
int dsz, sz, hsz;
unsigned char *to;
dsz = len;
hsz = msg_hdr_sz(hdr);
sz = hsz + dsz;
msg_set_size(hdr, sz);
if (unlikely(sz > max_size)) {
*buf = NULL;
return dsz;
}
*buf = tipc_buf_acquire(sz);
if (!(*buf))
return -ENOMEM;
skb_copy_to_linear_data(*buf, hdr, hsz);
to = (*buf)->data + hsz;
if (len && memcpy_fromiovecend(to, msg_sect, 0, dsz)) {
kfree_skb(*buf);
*buf = NULL;
return -EFAULT;
}
return dsz;
}
/* tipc_buf_append(): Append a buffer to the fragment list of another buffer
* @*headbuf: in: NULL for first frag, otherwise value returned from prev call
* out: set when successful non-complete reassembly, otherwise NULL
......@@ -155,7 +120,7 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
/**
* tipc_msg_build2 - create buffer chain containing specified header and data
* tipc_msg_build - create buffer chain containing specified header and data
* @mhdr: Message header, to be prepended to data
* @iov: User data
* @offset: Posision in iov to start copying from
......@@ -164,8 +129,8 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
* @chain: Buffer or chain of buffers to be returned to caller
* Returns message data size or errno: -ENOMEM, -EFAULT
*/
int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
int offset, int dsz, int pktmax , struct sk_buff **chain)
int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
int offset, int dsz, int pktmax , struct sk_buff **chain)
{
int mhsz = msg_hdr_sz(mhdr);
int msz = mhsz + dsz;
......@@ -417,3 +382,38 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
msg_set_destport(msg, dport);
return TIPC_OK;
}
/* tipc_msg_reassemble() - clone a buffer chain of fragments and
* reassemble the clones into one message
*/
struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
{
struct sk_buff *buf = chain;
struct sk_buff *frag = buf;
struct sk_buff *head = NULL;
int hdr_sz;
/* Copy header if single buffer */
if (!buf->next) {
hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
return __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
}
/* Clone all fragments and reassemble */
while (buf) {
frag = skb_clone(buf, GFP_ATOMIC);
if (!frag)
goto error;
frag->next = NULL;
if (tipc_buf_append(&head, &frag))
break;
if (!head)
goto error;
buf = buf->next;
}
return frag;
error:
pr_warn("Failed do clone local mcast rcv buffer\n");
kfree_skb(head);
return NULL;
}
......@@ -732,16 +732,15 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode);
void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
u32 destnode);
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
unsigned int len, int max_size, struct sk_buff **buf);
int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
int offset, int dsz, int mtu , struct sk_buff **chain);
int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
int offset, int dsz, int mtu , struct sk_buff **chain);
struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
#endif
......@@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
void named_cluster_distribute(struct sk_buff *buf)
{
struct sk_buff *buf_copy;
struct tipc_node *n_ptr;
struct tipc_link *l_ptr;
struct sk_buff *obuf;
struct tipc_node *node;
u32 dnode;
rcu_read_lock();
list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
tipc_node_lock(n_ptr);
l_ptr = n_ptr->active_links[n_ptr->addr & 1];
if (l_ptr) {
buf_copy = skb_copy(buf, GFP_ATOMIC);
if (!buf_copy) {
tipc_node_unlock(n_ptr);
break;
}
msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
__tipc_link_xmit(l_ptr, buf_copy);
}
tipc_node_unlock(n_ptr);
list_for_each_entry_rcu(node, &tipc_node_list, list) {
dnode = node->addr;
if (in_own_node(dnode))
continue;
if (!tipc_node_active_links(node))
continue;
obuf = skb_copy(buf, GFP_ATOMIC);
if (!obuf)
break;
msg_set_destnode(buf_msg(obuf), dnode);
tipc_link_xmit(obuf, dnode, dnode);
}
rcu_read_unlock();
......@@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
return buf;
}
/*
/**
* named_distribute - prepare name info for bulk distribution to another node
* @msg_list: list of messages (buffers) to be returned from this function
* @dnode: node to be updated
* @pls: linked list of publication items to be packed into buffer chain
*/
static void named_distribute(struct list_head *message_list, u32 node,
struct publ_list *pls, u32 max_item_buf)
static void named_distribute(struct list_head *msg_list, u32 dnode,
struct publ_list *pls)
{
struct publication *publ;
struct sk_buff *buf = NULL;
struct distr_item *item = NULL;
u32 left = 0;
u32 rest = pls->size * ITEM_SIZE;
uint dsz = pls->size * ITEM_SIZE;
uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
uint rem = dsz;
uint msg_rem = 0;
list_for_each_entry(publ, &pls->list, local_list) {
/* Prepare next buffer: */
if (!buf) {
left = (rest <= max_item_buf) ? rest : max_item_buf;
rest -= left;
buf = named_prepare_buf(PUBLICATION, left, node);
msg_rem = min_t(uint, rem, msg_dsz);
rem -= msg_rem;
buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
if (!buf) {
pr_warn("Bulk publication failure\n");
return;
}
item = (struct distr_item *)msg_data(buf_msg(buf));
}
/* Pack publication into message: */
publ_to_item(item, publ);
item++;
left -= ITEM_SIZE;
if (!left) {
list_add_tail((struct list_head *)buf, message_list);
msg_rem -= ITEM_SIZE;
/* Append full buffer to list: */
if (!msg_rem) {
list_add_tail((struct list_head *)buf, msg_list);
buf = NULL;
}
}
......@@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
/**
* tipc_named_node_up - tell specified node about all publications by this node
*/
void tipc_named_node_up(u32 max_item_buf, u32 node)
void tipc_named_node_up(u32 dnode)
{
LIST_HEAD(message_list);
LIST_HEAD(msg_list);
struct sk_buff *buf_chain;
read_lock_bh(&tipc_nametbl_lock);
named_distribute(&message_list, node, &publ_cluster, max_item_buf);
named_distribute(&message_list, node, &publ_zone, max_item_buf);
named_distribute(&msg_list, dnode, &publ_cluster);
named_distribute(&msg_list, dnode, &publ_zone);
read_unlock_bh(&tipc_nametbl_lock);
tipc_link_names_xmit(&message_list, node);
/* Convert circular list to linear list and send: */
buf_chain = (struct sk_buff *)msg_list.next;
((struct sk_buff *)msg_list.prev)->next = NULL;
tipc_link_xmit(buf_chain, dnode, dnode);
}
/**
......
......@@ -70,7 +70,7 @@ struct distr_item {
struct sk_buff *tipc_named_publish(struct publication *publ);
struct sk_buff *tipc_named_withdraw(struct publication *publ);
void named_cluster_distribute(struct sk_buff *buf);
void tipc_named_node_up(u32 max_item_buf, u32 node);
void tipc_named_node_up(u32 dnode);
void tipc_named_rcv(struct sk_buff *buf);
void tipc_named_reinit(void);
......
......@@ -474,8 +474,6 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
void tipc_node_unlock(struct tipc_node *node)
{
LIST_HEAD(nsub_list);
struct tipc_link *link;
int pkt_sz = 0;
u32 addr = 0;
if (likely(!node->action_flags)) {
......@@ -488,18 +486,13 @@ void tipc_node_unlock(struct tipc_node *node)
node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
}
if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
link = node->active_links[0];
node->action_flags &= ~TIPC_NOTIFY_NODE_UP;
if (link) {
pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
ITEM_SIZE;
addr = node->addr;
}
addr = node->addr;
}
spin_unlock_bh(&node->lock);
if (!list_empty(&nsub_list))
tipc_nodesub_notify(&nsub_list);
if (pkt_sz)
tipc_named_node_up(pkt_sz, addr);
if (addr)
tipc_named_node_up(addr);
}
......@@ -74,118 +74,6 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
(!peernode && (orignode == tipc_own_addr));
}
/**
* tipc_port_mcast_xmit - send a multicast message to local and remote
* destinations
*/
int tipc_port_mcast_xmit(struct tipc_port *oport,
struct tipc_name_seq const *seq,
struct iovec const *msg_sect,
unsigned int len)
{
struct tipc_msg *hdr;
struct sk_buff *buf;
struct sk_buff *ibuf = NULL;
struct tipc_port_list dports = {0, NULL, };
int ext_targets;
int res;
/* Create multicast message */
hdr = &oport->phdr;
msg_set_type(hdr, TIPC_MCAST_MSG);
msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
msg_set_destport(hdr, 0);
msg_set_destnode(hdr, 0);
msg_set_nametype(hdr, seq->type);
msg_set_namelower(hdr, seq->lower);
msg_set_nameupper(hdr, seq->upper);
msg_set_hdr_sz(hdr, MCAST_H_SIZE);
res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
if (unlikely(!buf))
return res;
/* Figure out where to send multicast message */
ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
TIPC_NODE_SCOPE, &dports);
/* Send message to destinations (duplicate it only if necessary) */
if (ext_targets) {
if (dports.count != 0) {
ibuf = skb_copy(buf, GFP_ATOMIC);
if (ibuf == NULL) {
tipc_port_list_free(&dports);
kfree_skb(buf);
return -ENOMEM;
}
}
res = tipc_bclink_xmit(buf);
if ((res < 0) && (dports.count != 0))
kfree_skb(ibuf);
} else {
ibuf = buf;
}
if (res >= 0) {
if (ibuf)
tipc_port_mcast_rcv(ibuf, &dports);
} else {
tipc_port_list_free(&dports);
}
return res;
}
/**
* tipc_port_mcast_rcv - deliver multicast message to all destination ports
*
* If there is no port list, perform a lookup to create one
*/
void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
{
struct tipc_msg *msg;
struct tipc_port_list dports = {0, NULL, };
struct tipc_port_list *item = dp;
int cnt = 0;
msg = buf_msg(buf);
/* Create destination port list, if one wasn't supplied */
if (dp == NULL) {
tipc_nametbl_mc_translate(msg_nametype(msg),
msg_namelower(msg),
msg_nameupper(msg),
TIPC_CLUSTER_SCOPE,
&dports);
item = dp = &dports;
}
/* Deliver a copy of message to each destination port */
if (dp->count != 0) {
msg_set_destnode(msg, tipc_own_addr);
if (dp->count == 1) {
msg_set_destport(msg, dp->ports[0]);
tipc_sk_rcv(buf);
tipc_port_list_free(dp);
return;
}
for (; cnt < dp->count; cnt++) {
int index = cnt % PLSIZE;
struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
if (b == NULL) {
pr_warn("Unable to deliver multicast message(s)\n");
goto exit;
}
if ((index == 0) && (cnt != 0))
item = item->next;
msg_set_destport(buf_msg(b), item->ports[index]);
tipc_sk_rcv(b);
}
}
exit:
kfree_skb(buf);
tipc_port_list_free(dp);
}
/* tipc_port_init - intiate TIPC port and lock it
*
* Returns obtained reference if initialization is successful, zero otherwise
......@@ -242,7 +130,7 @@ void tipc_port_destroy(struct tipc_port *p_ptr)
tipc_nodesub_unsubscribe(&p_ptr->subscription);
msg = buf_msg(buf);
peer = msg_destnode(msg);
tipc_link_xmit2(buf, peer, msg_link_selector(msg));
tipc_link_xmit(buf, peer, msg_link_selector(msg));
}
spin_lock_bh(&tipc_port_list_lock);
list_del(&p_ptr->port_list);
......@@ -299,7 +187,7 @@ static void port_timeout(unsigned long ref)
}
tipc_port_unlock(p_ptr);
msg = buf_msg(buf);
tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg));
tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
}
......@@ -314,7 +202,7 @@ static void port_handle_node_down(unsigned long ref)
buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
tipc_port_unlock(p_ptr);
msg = buf_msg(buf);
tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg));
tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
}
......@@ -459,7 +347,7 @@ void tipc_acknowledge(u32 ref, u32 ack)
if (!buf)
return;
msg = buf_msg(buf);
tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg));
tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
}
int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
......@@ -621,6 +509,6 @@ int tipc_port_shutdown(u32 ref)
buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
tipc_port_unlock(p_ptr);
msg = buf_msg(buf);
tipc_link_xmit2(buf, msg_destnode(msg), msg_link_selector(msg));
tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
return tipc_port_disconnect(ref);
}
......@@ -120,17 +120,7 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
struct tipc_portid const *peer);
int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
/*
* TIPC messaging routines
*/
int tipc_port_mcast_xmit(struct tipc_port *port,
struct tipc_name_seq const *seq,
struct iovec const *msg,
unsigned int len);
struct sk_buff *tipc_port_get_ports(void);
void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
void tipc_port_reinit(void);
/**
......
......@@ -53,6 +53,7 @@ static void tipc_data_ready(struct sock *sk);
static void tipc_write_space(struct sock *sk);
static int tipc_release(struct socket *sock);
static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
......@@ -130,7 +131,7 @@ static void reject_rx_queue(struct sock *sk)
while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
tipc_link_xmit2(buf, dnode, 0);
tipc_link_xmit(buf, dnode, 0);
}
}
......@@ -340,7 +341,7 @@ static int tipc_release(struct socket *sock)
tipc_port_disconnect(port->ref);
}
if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
tipc_link_xmit2(buf, dnode, 0);
tipc_link_xmit(buf, dnode, 0);
}
}
......@@ -534,6 +535,98 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
return mask;
}
/**
* tipc_sendmcast - send multicast message
* @sock: socket structure
* @seq: destination address
* @iov: message data to send
* @dsz: total length of message data
* @timeo: timeout to wait for wakeup
*
* Called from function tipc_sendmsg(), which has done all sanity checks
* Returns the number of bytes sent on success, or errno
*/
static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
struct iovec *iov, size_t dsz, long timeo)
{
struct sock *sk = sock->sk;
struct tipc_msg *mhdr = &tipc_sk(sk)->port.phdr;
struct sk_buff *buf;
uint mtu;
int rc;
msg_set_type(mhdr, TIPC_MCAST_MSG);
msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
msg_set_destport(mhdr, 0);
msg_set_destnode(mhdr, 0);
msg_set_nametype(mhdr, seq->type);
msg_set_namelower(mhdr, seq->lower);
msg_set_nameupper(mhdr, seq->upper);
msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
new_mtu:
mtu = tipc_bclink_get_mtu();
rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
if (unlikely(rc < 0))
return rc;
do {
rc = tipc_bclink_xmit(buf);
if (likely(rc >= 0)) {
rc = dsz;
break;
}
if (rc == -EMSGSIZE)
goto new_mtu;
if (rc != -ELINKCONG)
break;
rc = tipc_wait_for_sndmsg(sock, &timeo);
if (rc)
kfree_skb_list(buf);
} while (!rc);
return rc;
}
/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
*/
void tipc_sk_mcast_rcv(struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
struct tipc_port_list dports = {0, NULL, };
struct tipc_port_list *item;
struct sk_buff *b;
uint i, last, dst = 0;
u32 scope = TIPC_CLUSTER_SCOPE;
if (in_own_node(msg_orignode(msg)))
scope = TIPC_NODE_SCOPE;
/* Create destination port list: */
tipc_nametbl_mc_translate(msg_nametype(msg),
msg_namelower(msg),
msg_nameupper(msg),
scope,
&dports);
last = dports.count;
if (!last) {
kfree_skb(buf);
return;
}
for (item = &dports; item; item = item->next) {
for (i = 0; i < PLSIZE && ++dst <= last; i++) {
b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
if (!b) {
pr_warn("Failed do clone mcast rcv buffer\n");
continue;
}
msg_set_destport(msg, item->ports[i]);
tipc_sk_rcv(b);
}
}
tipc_port_list_free(&dports);
}
/**
* tipc_sk_proto_rcv - receive a connection mng protocol message
* @tsk: receiving socket
......@@ -629,43 +722,6 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
return 0;
}
/**
* tipc_sendmcast - send multicast message
* @sock: socket structure
* @seq: destination address
* @iov: message data to send
* @dsz: total length of message data
* @timeo: timeout to wait for wakeup
*
* Called from function tipc_sendmsg(), which has done all sanity checks
* Returns the number of bytes sent on success, or errno
*/
static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
struct iovec *iov, size_t dsz, long timeo)
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
int rc;
do {
if (sock->state != SS_READY) {
rc = -EOPNOTSUPP;
break;
}
rc = tipc_port_mcast_xmit(&tsk->port, seq, iov, dsz);
if (likely(rc >= 0)) {
if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
break;
}
if (rc != -ELINKCONG)
break;
rc = tipc_wait_for_sndmsg(sock, &timeo);
} while (!rc);
return rc;
}
/**
* tipc_sendmsg - send message in connectionless manner
* @iocb: if NULL, indicates that socket lock is already held
......@@ -765,12 +821,12 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
new_mtu:
mtu = tipc_node_get_mtu(dnode, tsk->port.ref);
rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf);
rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
if (rc < 0)
goto exit;
do {
rc = tipc_link_xmit2(buf, dnode, tsk->port.ref);
rc = tipc_link_xmit(buf, dnode, tsk->port.ref);
if (likely(rc >= 0)) {
if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
......@@ -878,12 +934,12 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
next:
mtu = port->max_pkt;
send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
rc = tipc_msg_build2(mhdr, m->msg_iov, sent, send, mtu, &buf);
rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf);
if (unlikely(rc < 0))
goto exit;
do {
if (likely(!tipc_sk_conn_cong(tsk))) {
rc = tipc_link_xmit2(buf, dnode, ref);
rc = tipc_link_xmit(buf, dnode, ref);
if (likely(!rc)) {
tsk->sent_unacked++;
sent += send;
......@@ -1515,7 +1571,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc))
return 0;
tipc_link_xmit2(buf, onode, 0);
tipc_link_xmit(buf, onode, 0);
return 0;
}
......@@ -1567,7 +1623,7 @@ int tipc_sk_rcv(struct sk_buff *buf)
if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc))
return -EHOSTUNREACH;
tipc_link_xmit2(buf, dnode, 0);
tipc_link_xmit(buf, dnode, 0);
return (rc < 0) ? -EHOSTUNREACH : 0;
}
......@@ -1854,7 +1910,7 @@ static int tipc_shutdown(struct socket *sock, int how)
}
tipc_port_disconnect(port->ref);
if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN))
tipc_link_xmit2(buf, peer, 0);
tipc_link_xmit(buf, peer, 0);
} else {
tipc_port_shutdown(port->ref);
}
......
......@@ -85,4 +85,6 @@ static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
int tipc_sk_rcv(struct sk_buff *buf);
void tipc_sk_mcast_rcv(struct sk_buff *buf);
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment