Commit a386bff8 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc_net-next' of git://git.kernel.org/pub/scm/linux/kernel/git/paulg/linux

Paul Gortmaker says:

====================
The most interesting thing here, at least from a user perspective,
is the broadcast link fix -- where there was a corner case where
two endpoints could get in a state where they disagree on where
to start Rx and ack of broadcast packets.

There is also the poll/wait changes which could also impact
end users for certain use cases - the fixes there also better
align tipc with the rest of the networking code.

The rest largely falls into routine cleanup category, by getting
rid of some unused routines, some Kconfig clutter, etc.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 2b916477 94fc9c47
......@@ -20,18 +20,9 @@ menuconfig TIPC
If in doubt, say N.
if TIPC
config TIPC_ADVANCED
bool "Advanced TIPC configuration"
default n
help
Saying Y here will open some advanced configuration for TIPC.
Most users do not need to bother; if unsure, just say N.
config TIPC_PORTS
int "Maximum number of ports in a node"
depends on TIPC_ADVANCED
depends on TIPC
range 127 65535
default "8191"
help
......@@ -40,5 +31,3 @@ config TIPC_PORTS
Setting this to a smaller value saves some memory,
setting it to higher allows for more ports.
endif # TIPC
......@@ -347,7 +347,7 @@ static void bclink_peek_nack(struct tipc_msg *msg)
tipc_node_lock(n_ptr);
if (n_ptr->bclink.supported &&
if (n_ptr->bclink.recv_permitted &&
(n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
(n_ptr->bclink.last_in == msg_bcgap_after(msg)))
n_ptr->bclink.oos_state = 2;
......@@ -429,7 +429,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
goto exit;
tipc_node_lock(node);
if (unlikely(!node->bclink.supported))
if (unlikely(!node->bclink.recv_permitted))
goto unlock;
/* Handle broadcast protocol message */
......@@ -564,7 +564,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
{
return (n_ptr->bclink.supported &&
return (n_ptr->bclink.recv_permitted &&
(tipc_bclink_get_last_sent() != n_ptr->bclink.acked));
}
......@@ -619,16 +619,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
if (bcbearer->remains_new.count == bcbearer->remains.count)
continue; /* bearer pair doesn't add anything */
if (p->blocked ||
p->media->send_msg(buf, p, &p->media->bcast_addr)) {
if (!tipc_bearer_blocked(p))
tipc_bearer_send(p, buf, &p->media->bcast_addr);
else if (s && !tipc_bearer_blocked(s))
/* unable to send on primary bearer */
if (!s || s->blocked ||
s->media->send_msg(buf, s,
&s->media->bcast_addr)) {
/* unable to send on either bearer */
continue;
}
}
tipc_bearer_send(s, buf, &s->media->bcast_addr);
else
/* unable to send on either bearer */
continue;
if (s) {
bcbearer->bpairs[bp_index].primary = s;
......@@ -731,8 +729,8 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
" TX naks:%u acks:%u dups:%u\n",
s->sent_nacks, s->sent_acks, s->retransmitted);
ret += tipc_snprintf(buf + ret, buf_size - ret,
" Congestion bearer:%u link:%u Send queue max:%u avg:%u\n",
s->bearer_congs, s->link_congs, s->max_queue_sz,
" Congestion link:%u Send queue max:%u avg:%u\n",
s->link_congs, s->max_queue_sz,
s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);
......@@ -766,7 +764,6 @@ int tipc_bclink_set_queue_limits(u32 limit)
void tipc_bclink_init(void)
{
INIT_LIST_HEAD(&bcbearer->bearer.cong_links);
bcbearer->bearer.media = &bcbearer->media;
bcbearer->media.send_msg = tipc_bcbearer_send;
sprintf(bcbearer->media.name, "tipc-broadcast");
......
......@@ -279,115 +279,30 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
}
/*
* bearer_push(): Resolve bearer congestion. Force the waiting
* links to push out their unsent packets, one packet per link
* per iteration, until all packets are gone or congestion reoccurs.
* 'tipc_net_lock' is read_locked when this function is called
* bearer.lock must be taken before calling
* Returns binary true(1) ore false(0)
*/
static int bearer_push(struct tipc_bearer *b_ptr)
{
u32 res = 0;
struct tipc_link *ln, *tln;
if (b_ptr->blocked)
return 0;
while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) {
list_for_each_entry_safe(ln, tln, &b_ptr->cong_links, link_list) {
res = tipc_link_push_packet(ln);
if (res == PUSH_FAILED)
break;
if (res == PUSH_FINISHED)
list_move_tail(&ln->link_list, &b_ptr->links);
}
}
return list_empty(&b_ptr->cong_links);
}
void tipc_bearer_lock_push(struct tipc_bearer *b_ptr)
{
spin_lock_bh(&b_ptr->lock);
bearer_push(b_ptr);
spin_unlock_bh(&b_ptr->lock);
}
/*
* Interrupt enabling new requests after bearer congestion or blocking:
* Interrupt enabling new requests after bearer blocking:
* See bearer_send().
*/
void tipc_continue(struct tipc_bearer *b_ptr)
void tipc_continue(struct tipc_bearer *b)
{
spin_lock_bh(&b_ptr->lock);
if (!list_empty(&b_ptr->cong_links))
tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr);
b_ptr->blocked = 0;
spin_unlock_bh(&b_ptr->lock);
spin_lock_bh(&b->lock);
b->blocked = 0;
spin_unlock_bh(&b->lock);
}
/*
* Schedule link for sending of messages after the bearer
* has been deblocked by 'continue()'. This method is called
* when somebody tries to send a message via this link while
* the bearer is congested. 'tipc_net_lock' is in read_lock here
* bearer.lock is busy
* tipc_bearer_blocked - determines if bearer is currently blocked
*/
static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr,
struct tipc_link *l_ptr)
int tipc_bearer_blocked(struct tipc_bearer *b)
{
list_move_tail(&l_ptr->link_list, &b_ptr->cong_links);
}
/*
* Schedule link for sending of messages after the bearer
* has been deblocked by 'continue()'. This method is called
* when somebody tries to send a message via this link while
* the bearer is congested. 'tipc_net_lock' is in read_lock here,
* bearer.lock is free
*/
void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
{
spin_lock_bh(&b_ptr->lock);
tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
spin_unlock_bh(&b_ptr->lock);
}
int res;
/*
* tipc_bearer_resolve_congestion(): Check if there is bearer congestion,
* and if there is, try to resolve it before returning.
* 'tipc_net_lock' is read_locked when this function is called
*/
int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
struct tipc_link *l_ptr)
{
int res = 1;
spin_lock_bh(&b->lock);
res = b->blocked;
spin_unlock_bh(&b->lock);
if (list_empty(&b_ptr->cong_links))
return 1;
spin_lock_bh(&b_ptr->lock);
if (!bearer_push(b_ptr)) {
tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
res = 0;
}
spin_unlock_bh(&b_ptr->lock);
return res;
}
/**
* tipc_bearer_congested - determines if bearer is currently congested
*/
int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
{
if (unlikely(b_ptr->blocked))
return 1;
if (likely(list_empty(&b_ptr->cong_links)))
return 0;
return !tipc_bearer_resolve_congestion(b_ptr, l_ptr);
}
/**
* tipc_enable_bearer - enable bearer with the given name
*/
......@@ -489,7 +404,6 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
b_ptr->net_plane = bearer_id + 'A';
b_ptr->active = 1;
b_ptr->priority = priority;
INIT_LIST_HEAD(&b_ptr->cong_links);
INIT_LIST_HEAD(&b_ptr->links);
spin_lock_init(&b_ptr->lock);
......@@ -528,7 +442,6 @@ int tipc_block_bearer(const char *name)
pr_info("Blocking bearer <%s>\n", name);
spin_lock_bh(&b_ptr->lock);
b_ptr->blocked = 1;
list_splice_init(&b_ptr->cong_links, &b_ptr->links);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
struct tipc_node *n_ptr = l_ptr->owner;
......@@ -555,7 +468,6 @@ static void bearer_disable(struct tipc_bearer *b_ptr)
spin_lock_bh(&b_ptr->lock);
b_ptr->blocked = 1;
b_ptr->media->disable_bearer(b_ptr);
list_splice_init(&b_ptr->cong_links, &b_ptr->links);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
tipc_link_delete(l_ptr);
}
......
......@@ -120,7 +120,6 @@ struct tipc_media {
* @identity: array index of this bearer within TIPC bearer array
* @link_req: ptr to (optional) structure making periodic link setup requests
* @links: list of non-congested links associated with bearer
* @cong_links: list of congested links associated with bearer
* @active: non-zero if bearer structure is represents a bearer
* @net_plane: network plane ('A' through 'H') currently associated with bearer
* @nodes: indicates which nodes in cluster can be reached through bearer
......@@ -143,7 +142,6 @@ struct tipc_bearer {
u32 identity;
struct tipc_link_req *link_req;
struct list_head links;
struct list_head cong_links;
int active;
char net_plane;
struct tipc_node_map nodes;
......@@ -185,39 +183,23 @@ struct sk_buff *tipc_media_get_names(void);
struct sk_buff *tipc_bearer_get_names(void);
void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);
void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
struct tipc_bearer *tipc_bearer_find(const char *name);
struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);
struct tipc_media *tipc_media_find(const char *name);
int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
struct tipc_link *l_ptr);
int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
int tipc_bearer_blocked(struct tipc_bearer *b_ptr);
void tipc_bearer_stop(void);
void tipc_bearer_lock_push(struct tipc_bearer *b_ptr);
/**
* tipc_bearer_send- sends buffer to destination over bearer
*
* Returns true (1) if successful, or false (0) if unable to send
*
* IMPORTANT:
* The media send routine must not alter the buffer being passed in
* as it may be needed for later retransmission!
*
* If the media send routine returns a non-zero value (indicating that
* it was unable to send the buffer), it must:
* 1) mark the bearer as blocked,
* 2) call tipc_continue() once the bearer is able to send again.
* Media types that are unable to meet these two critera must ensure their
* send routine always returns success -- even if the buffer was not sent --
* and let TIPC's link code deal with the undelivered message.
*/
static inline int tipc_bearer_send(struct tipc_bearer *b_ptr,
struct sk_buff *buf,
static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
struct tipc_media_addr *dest)
{
return !b_ptr->media->send_msg(buf, b_ptr, dest);
b->media->send_msg(buf, b, dest);
}
#endif /* _TIPC_BEARER_H */
......@@ -42,11 +42,6 @@
#include <linux/module.h>
#ifndef CONFIG_TIPC_PORTS
#define CONFIG_TIPC_PORTS 8191
#endif
/* global variables used by multiple sub-systems within TIPC */
int tipc_random __read_mostly;
......
......@@ -243,7 +243,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
if (rbuf) {
b_ptr->media->send_msg(rbuf, b_ptr, &media_addr);
tipc_bearer_send(b_ptr, rbuf, &media_addr);
kfree_skb(rbuf);
}
}
......
/*
* net/tipc/link.c: TIPC link code
*
* Copyright (c) 1996-2007, Ericsson AB
* Copyright (c) 1996-2007, 2012, Ericsson AB
* Copyright (c) 2004-2007, 2010-2011, Wind River Systems
* All rights reserved.
*
......@@ -103,6 +103,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
static void link_start(struct tipc_link *l_ptr);
static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
static void tipc_link_send_sync(struct tipc_link *l);
static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
/*
* Simple link routines
......@@ -712,6 +714,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
link_activate(l_ptr);
tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
l_ptr->fsm_msg_cnt++;
if (l_ptr->owner->working_links == 1)
tipc_link_send_sync(l_ptr);
link_set_timer(l_ptr, cont_intv);
break;
case RESET_MSG:
......@@ -745,6 +749,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
link_activate(l_ptr);
tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
l_ptr->fsm_msg_cnt++;
if (l_ptr->owner->working_links == 1)
tipc_link_send_sync(l_ptr);
link_set_timer(l_ptr, cont_intv);
break;
case RESET_MSG:
......@@ -872,17 +878,12 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
return link_send_long_buf(l_ptr, buf);
/* Packet can be queued or sent. */
if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
!link_congested(l_ptr))) {
link_add_to_outqueue(l_ptr, buf, msg);
if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
l_ptr->unacked_window = 0;
} else {
tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
l_ptr->stats.bearer_congs++;
l_ptr->next_out = buf;
}
tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
l_ptr->unacked_window = 0;
return dsz;
}
/* Congestion: can message be bundled ? */
......@@ -891,10 +892,8 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
/* Try adding message to an existing bundle */
if (l_ptr->next_out &&
link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
link_bundle_buf(l_ptr, l_ptr->last_out, buf))
return dsz;
}
/* Try creating a new bundle */
if (size <= max_packet * 2 / 3) {
......@@ -917,7 +916,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
if (!l_ptr->next_out)
l_ptr->next_out = buf;
link_add_to_outqueue(l_ptr, buf, msg);
tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
return dsz;
}
......@@ -949,7 +947,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
return res;
}
/**
/*
* tipc_link_send_sync - synchronize broadcast link endpoints.
*
* Give a newly added peer node the sequence number where it should
* start receiving and acking broadcast packets.
*
* Called with node locked
*/
static void tipc_link_send_sync(struct tipc_link *l)
{
struct sk_buff *buf;
struct tipc_msg *msg;
buf = tipc_buf_acquire(INT_H_SIZE);
if (!buf)
return;
msg = buf_msg(buf);
tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
msg_set_last_bcast(msg, l->owner->bclink.acked);
link_add_chain_to_outqueue(l, buf, 0);
tipc_link_push_queue(l);
}
/*
* tipc_link_recv_sync - synchronize broadcast link endpoints.
* Receive the sequence number where we should start receiving and
* acking broadcast packets from a newly added peer node, and open
* up for reception of such packets.
*
* Called with node locked
*/
static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
n->bclink.recv_permitted = true;
kfree_skb(buf);
}
/*
* tipc_link_send_names - send name table entries to new neighbor
*
* Send routine for bulk delivery of name table messages when contact
......@@ -1006,16 +1045,11 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
if (likely(!link_congested(l_ptr))) {
if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
link_add_to_outqueue(l_ptr, buf, msg);
if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
&l_ptr->media_addr))) {
l_ptr->unacked_window = 0;
return res;
}
tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
l_ptr->stats.bearer_congs++;
l_ptr->next_out = buf;
tipc_bearer_send(l_ptr->b_ptr, buf,
&l_ptr->media_addr);
l_ptr->unacked_window = 0;
return res;
}
} else
......@@ -1106,7 +1140,7 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
/* Exit if link (or bearer) is congested */
if (link_congested(l_ptr) ||
!list_empty(&l_ptr->b_ptr->cong_links)) {
tipc_bearer_blocked(l_ptr->b_ptr)) {
res = link_schedule_port(l_ptr,
sender->ref, res);
goto exit;
......@@ -1329,15 +1363,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
if (r_q_size && buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
l_ptr->retransm_queue_head = mod(++r_q_head);
l_ptr->retransm_queue_size = --r_q_size;
l_ptr->stats.retransmitted++;
return 0;
} else {
l_ptr->stats.bearer_congs++;
return PUSH_FAILED;
}
tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
l_ptr->retransm_queue_head = mod(++r_q_head);
l_ptr->retransm_queue_size = --r_q_size;
l_ptr->stats.retransmitted++;
return 0;
}
/* Send deferred protocol message, if any: */
......@@ -1345,15 +1375,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
if (buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
l_ptr->unacked_window = 0;
kfree_skb(buf);
l_ptr->proto_msg_queue = NULL;
return 0;
} else {
l_ptr->stats.bearer_congs++;
return PUSH_FAILED;
}
tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
l_ptr->unacked_window = 0;
kfree_skb(buf);
l_ptr->proto_msg_queue = NULL;
return 0;
}
/* Send one deferred data message, if send window not full: */
......@@ -1366,18 +1392,14 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
if (mod(next - first) < l_ptr->queue_limit[0]) {
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
if (msg_user(msg) == MSG_BUNDLER)
msg_set_type(msg, CLOSED_MSG);
l_ptr->next_out = buf->next;
return 0;
} else {
l_ptr->stats.bearer_congs++;
return PUSH_FAILED;
}
tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
if (msg_user(msg) == MSG_BUNDLER)
msg_set_type(msg, CLOSED_MSG);
l_ptr->next_out = buf->next;
return 0;
}
}
return PUSH_FINISHED;
return 1;
}
/*
......@@ -1388,15 +1410,12 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
{
u32 res;
if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
if (tipc_bearer_blocked(l_ptr->b_ptr))
return;
do {
res = tipc_link_push_packet(l_ptr);
} while (!res);
if (res == PUSH_FAILED)
tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
}
static void link_reset_all(unsigned long addr)
......@@ -1454,9 +1473,8 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
tipc_addr_string_fill(addr_string, n_ptr->addr);
pr_info("Broadcast link info for %s\n", addr_string);
pr_info("Supportable: %d, Supported: %d, Acked: %u\n",
n_ptr->bclink.supportable,
n_ptr->bclink.supported,
pr_info("Reception permitted: %d, Acked: %u\n",
n_ptr->bclink.recv_permitted,
n_ptr->bclink.acked);
pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
n_ptr->bclink.last_in,
......@@ -1481,7 +1499,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
msg = buf_msg(buf);
if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
if (tipc_bearer_blocked(l_ptr->b_ptr)) {
if (l_ptr->retransm_queue_size == 0) {
l_ptr->retransm_queue_head = msg_seqno(msg);
l_ptr->retransm_queue_size = retransmits;
......@@ -1491,7 +1509,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
}
return;
} else {
/* Detect repeated retransmit failures on uncongested bearer */
/* Detect repeated retransmit failures on unblocked bearer */
if (l_ptr->last_retransmitted == msg_seqno(msg)) {
if (++l_ptr->stale_count > 100) {
link_retransmit_failure(l_ptr, buf);
......@@ -1507,17 +1525,10 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
msg = buf_msg(buf);
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
buf = buf->next;
retransmits--;
l_ptr->stats.retransmitted++;
} else {
tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
l_ptr->stats.bearer_congs++;
l_ptr->retransm_queue_head = buf_seqno(buf);
l_ptr->retransm_queue_size = retransmits;
return;
}
tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
buf = buf->next;
retransmits--;
l_ptr->stats.retransmitted++;
}
l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
......@@ -1676,7 +1687,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
ackd = msg_ack(msg);
/* Release acked messages */
if (n_ptr->bclink.supported)
if (n_ptr->bclink.recv_permitted)
tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
crs = l_ptr->first_out;
......@@ -1727,9 +1738,14 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
tipc_link_recv_bundle(buf);
continue;
case NAME_DISTRIBUTOR:
n_ptr->bclink.recv_permitted = true;
tipc_node_unlock(n_ptr);
tipc_named_recv(buf);
continue;
case BCAST_PROTOCOL:
tipc_link_recv_sync(n_ptr, buf);
tipc_node_unlock(n_ptr);
continue;
case CONN_MANAGER:
tipc_node_unlock(n_ptr);
tipc_port_recv_proto_msg(buf);
......@@ -1772,16 +1788,19 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
continue;
}
/* Link is not in state WORKING_WORKING */
if (msg_user(msg) == LINK_PROTOCOL) {
link_recv_proto_msg(l_ptr, buf);
head = link_insert_deferred_queue(l_ptr, head);
tipc_node_unlock(n_ptr);
continue;
}
/* Traffic message. Conditionally activate link */
link_state_event(l_ptr, TRAFFIC_MSG_EVT);
if (link_working_working(l_ptr)) {
/* Re-insert in front of queue */
/* Re-insert buffer in front of queue */
buf->next = head;
head = buf;
tipc_node_unlock(n_ptr);
......@@ -1972,21 +1991,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
/* Defer message if bearer is already congested */
if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
l_ptr->proto_msg_queue = buf;
return;
}
/* Defer message if attempting to send results in bearer congestion */
if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
/* Defer message if bearer is already blocked */
if (tipc_bearer_blocked(l_ptr->b_ptr)) {
l_ptr->proto_msg_queue = buf;
l_ptr->stats.bearer_congs++;
return;
}
/* Discard message if it was sent successfully */
tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
l_ptr->unacked_window = 0;
kfree_skb(buf);
}
......@@ -2057,7 +2068,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
} else {
l_ptr->max_pkt = l_ptr->max_pkt_target;
}
l_ptr->owner->bclink.supportable = (max_pkt_info != 0);
/* Synchronize broadcast link info, if not done previously */
if (!tipc_node_is_up(l_ptr->owner)) {
......@@ -2112,7 +2122,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
}
/* Protocol message before retransmits, reduce loss risk */
if (l_ptr->owner->bclink.supported)
if (l_ptr->owner->bclink.recv_permitted)
tipc_bclink_update_link_state(l_ptr->owner,
msg_last_bcast(msg));
......@@ -2937,8 +2947,8 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
s->sent_nacks, s->sent_acks, s->retransmitted);
ret += tipc_snprintf(buf + ret, buf_size - ret,
" Congestion bearer:%u link:%u Send queue"
" max:%u avg:%u\n", s->bearer_congs, s->link_congs,
" Congestion link:%u Send queue"
" max:%u avg:%u\n", s->link_congs,
s->max_queue_sz, s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);
......
......@@ -40,9 +40,6 @@
#include "msg.h"
#include "node.h"
#define PUSH_FAILED 1
#define PUSH_FINISHED 2
/*
* Out-of-range value for link sequence numbers
*/
......@@ -82,7 +79,6 @@ struct tipc_stats {
u32 recv_fragmented;
u32 recv_fragments;
u32 link_congs; /* # port sends blocked by congestion */
u32 bearer_congs;
u32 deferred_recv;
u32 duplicates;
u32 max_queue_sz; /* send queue size high water mark */
......
......@@ -262,7 +262,7 @@ void tipc_named_node_up(unsigned long nodearg)
named_distribute(&message_list, node, &publ_zone, max_item_buf);
read_unlock_bh(&tipc_nametbl_lock);
tipc_link_send_names(&message_list, (u32)node);
tipc_link_send_names(&message_list, node);
}
/**
......
/*
* net/tipc/node.c: TIPC node management routines
*
* Copyright (c) 2000-2006, Ericsson AB
* Copyright (c) 2000-2006, 2012 Ericsson AB
* Copyright (c) 2005-2006, 2010-2011, Wind River Systems
* All rights reserved.
*
......@@ -263,12 +263,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
static void node_established_contact(struct tipc_node *n_ptr)
{
tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
if (n_ptr->bclink.supportable) {
n_ptr->bclink.acked = tipc_bclink_get_last_sent();
tipc_bclink_add_node(n_ptr->addr);
n_ptr->bclink.supported = 1;
}
n_ptr->bclink.oos_state = 0;
n_ptr->bclink.acked = tipc_bclink_get_last_sent();
tipc_bclink_add_node(n_ptr->addr);
}
static void node_name_purge_complete(unsigned long node_addr)
......@@ -294,7 +291,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_addr_string_fill(addr_string, n_ptr->addr));
/* Flush broadcast link info associated with lost node */
if (n_ptr->bclink.supported) {
if (n_ptr->bclink.recv_permitted) {
while (n_ptr->bclink.deferred_head) {
struct sk_buff *buf = n_ptr->bclink.deferred_head;
n_ptr->bclink.deferred_head = buf->next;
......@@ -310,7 +307,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_bclink_remove_node(n_ptr->addr);
tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
n_ptr->bclink.supported = 0;
n_ptr->bclink.recv_permitted = false;
}
/* Abort link changeover */
......
......@@ -67,8 +67,6 @@
* @permit_changeover: non-zero if node has redundant links to this system
* @signature: node instance identifier
* @bclink: broadcast-related info
* @supportable: non-zero if node supports TIPC b'cast link capability
* @supported: non-zero if node supports TIPC b'cast capability
* @acked: sequence # of last outbound b'cast message acknowledged by node
* @last_in: sequence # of last in-sequence b'cast message received from node
* @last_sent: sequence # of last b'cast message sent by node
......@@ -77,6 +75,7 @@
* @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node
* @defragm: list of partially reassembled b'cast message fragments from node
* @recv_permitted: true if node is allowed to receive b'cast messages
*/
struct tipc_node {
u32 addr;
......@@ -92,8 +91,6 @@ struct tipc_node {
int permit_changeover;
u32 signature;
struct {
u8 supportable;
u8 supported;
u32 acked;
u32 last_in;
u32 last_sent;
......@@ -102,6 +99,7 @@ struct tipc_node {
struct sk_buff *deferred_head;
struct sk_buff *deferred_tail;
struct sk_buff *defragm;
bool recv_permitted;
} bclink;
};
......
......@@ -62,6 +62,8 @@ struct tipc_sock {
static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
static void wakeupdispatch(struct tipc_port *tport);
static void tipc_data_ready(struct sock *sk, int len);
static void tipc_write_space(struct sock *sk);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
......@@ -221,6 +223,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
sock_init_data(sock, sk);
sk->sk_backlog_rcv = backlog_rcv;
sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space;
tipc_sk(sk)->p = tp_ptr;
tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
......@@ -408,7 +412,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr,
* socket state flags set
* ------------ ---------
* unconnected no read flags
* no write flags
* POLLOUT if port is not congested
*
* connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue
* no write flags
......@@ -435,9 +439,13 @@ static unsigned int poll(struct file *file, struct socket *sock,
struct sock *sk = sock->sk;
u32 mask = 0;
poll_wait(file, sk_sleep(sk), wait);
sock_poll_wait(file, sk_sleep(sk), wait);
switch ((int)sock->state) {
case SS_UNCONNECTED:
if (!tipc_sk_port(sk)->congested)
mask |= POLLOUT;
break;
case SS_READY:
case SS_CONNECTED:
if (!tipc_sk_port(sk)->congested)
......@@ -1125,6 +1133,39 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
return sz_copied ? sz_copied : res;
}
/**
* tipc_write_space - wake up thread if port congestion is released
* @sk: socket
*/
static void tipc_write_space(struct sock *sk)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
POLLWRNORM | POLLWRBAND);
rcu_read_unlock();
}
/**
* tipc_data_ready - wake up threads to indicate messages have been received
* @sk: socket
* @len: the length of messages
*/
static void tipc_data_ready(struct sock *sk, int len)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (wq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
POLLRDNORM | POLLRDBAND);
rcu_read_unlock();
}
/**
* rx_queue_full - determine if receive queue can accept another message
* @msg: message to be added to queue
......@@ -1222,8 +1263,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
tipc_disconnect_port(tipc_sk_port(sk));
}
if (waitqueue_active(sk_sleep(sk)))
wake_up_interruptible(sk_sleep(sk));
sk->sk_data_ready(sk, 0);
return TIPC_OK;
}
......@@ -1290,8 +1330,7 @@ static void wakeupdispatch(struct tipc_port *tport)
{
struct sock *sk = (struct sock *)tport->usr_handle;
if (waitqueue_active(sk_sleep(sk)))
wake_up_interruptible(sk_sleep(sk));
sk->sk_write_space(sk);
}
/**
......@@ -1556,10 +1595,11 @@ static int shutdown(struct socket *sock, int how)
case SS_DISCONNECTING:
/* Discard any unreceived messages; wake up sleeping tasks */
/* Discard any unreceived messages */
discard_rx_queue(sk);
if (waitqueue_active(sk_sleep(sk)))
wake_up_interruptible(sk_sleep(sk));
/* Wake up anyone sleeping in poll */
sk->sk_state_change(sk);
res = 0;
break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment