Commit 95ed4019 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc_fragmentation'

Erik Hugne says:

====================
tipc: message reassembly using fragment chain

We introduce a new reassembly algorithm that improves performance
and eliminates the risk of causing out-of-memory situations.

v3: -Use skb_try_coalesce, and revert to fraglist if this does not succeed.
    -Make sure reassembly list head is uncloned.

v2: -Rebased on Ying's indentation fix.
    -Node unlock call in msg_fragmenter case moved from patch #2 to #1.
     ('continue' with this lock held would cause spinlock recursion if only
      patch #1 is used)
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents b0db7b0c a715b49e
......@@ -480,18 +480,24 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
tipc_node_unlock(node);
tipc_link_recv_bundle(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
int ret = tipc_link_recv_fragment(&node->bclink.defragm,
&buf, &msg);
if (ret < 0)
int ret;
ret = tipc_link_recv_fragment(&node->bclink.reasm_head,
&node->bclink.reasm_tail,
&buf);
if (ret == LINK_REASM_ERROR)
goto unlock;
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++;
if (ret > 0)
if (ret == LINK_REASM_COMPLETE) {
bcl->stats.recv_fragmented++;
/* Point msg to inner header */
msg = buf_msg(buf);
spin_unlock_bh(&bc_lock);
goto receive;
}
spin_unlock_bh(&bc_lock);
tipc_node_unlock(node);
tipc_net_route_msg(buf);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
......
......@@ -404,15 +404,9 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
*/
void tipc_link_reset_fragments(struct tipc_link *l_ptr)
{
struct sk_buff *buf = l_ptr->defragm_buf;
struct sk_buff *next;
while (buf) {
next = buf->next;
kfree_skb(buf);
buf = next;
}
l_ptr->defragm_buf = NULL;
kfree_skb(l_ptr->reasm_head);
l_ptr->reasm_head = NULL;
l_ptr->reasm_tail = NULL;
}
/**
......@@ -1649,15 +1643,18 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
continue;
case MSG_FRAGMENTER:
l_ptr->stats.recv_fragments++;
ret = tipc_link_recv_fragment(&l_ptr->defragm_buf,
&buf, &msg);
if (ret == 1) {
ret = tipc_link_recv_fragment(&l_ptr->reasm_head,
&l_ptr->reasm_tail,
&buf);
if (ret == LINK_REASM_COMPLETE) {
l_ptr->stats.recv_fragmented++;
msg = buf_msg(buf);
goto deliver;
}
if (ret == -1)
l_ptr->next_in_no--;
break;
if (ret == LINK_REASM_ERROR)
tipc_link_reset(l_ptr);
tipc_node_unlock(n_ptr);
continue;
case CHANGEOVER_PROTOCOL:
type = msg_type(msg);
if (link_recv_changeover_msg(&l_ptr, &buf)) {
......@@ -2341,115 +2338,48 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
return dsz;
}
/*
* A pending message being re-assembled must store certain values
* to handle subsequent fragments correctly. The following functions
* help storing these values in unused, available fields in the
* pending message. This makes dynamic memory allocation unnecessary.
*/
static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
{
msg_set_seqno(buf_msg(buf), seqno);
}
static u32 get_fragm_size(struct sk_buff *buf)
{
return msg_ack(buf_msg(buf));
}
static void set_fragm_size(struct sk_buff *buf, u32 sz)
{
msg_set_ack(buf_msg(buf), sz);
}
static u32 get_expected_frags(struct sk_buff *buf)
{
return msg_bcast_ack(buf_msg(buf));
}
static void set_expected_frags(struct sk_buff *buf, u32 exp)
{
msg_set_bcast_ack(buf_msg(buf), exp);
}
/*
* tipc_link_recv_fragment(): Called with node lock on. Returns
* the reassembled buffer if message is complete.
*/
int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
struct tipc_msg **m)
{
struct sk_buff *prev = NULL;
struct sk_buff *fbuf = *fb;
struct tipc_msg *fragm = buf_msg(fbuf);
struct sk_buff *pbuf = *pending;
u32 long_msg_seq_no = msg_long_msgno(fragm);
*fb = NULL;
/* Is there an incomplete message waiting for this fragment? */
while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) ||
(msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
prev = pbuf;
pbuf = pbuf->next;
}
if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
u32 msg_sz = msg_size(imsg);
u32 fragm_sz = msg_data_sz(fragm);
u32 exp_fragm_cnt;
u32 max = TIPC_MAX_USER_MSG_SIZE + NAMED_H_SIZE;
if (msg_type(imsg) == TIPC_MCAST_MSG)
max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
if (fragm_sz == 0 || msg_size(imsg) > max) {
kfree_skb(fbuf);
return 0;
}
exp_fragm_cnt = msg_sz / fragm_sz + !!(msg_sz % fragm_sz);
pbuf = tipc_buf_acquire(msg_size(imsg));
if (pbuf != NULL) {
pbuf->next = *pending;
*pending = pbuf;
skb_copy_to_linear_data(pbuf, imsg,
msg_data_sz(fragm));
/* Prepare buffer for subsequent fragments. */
set_long_msg_seqno(pbuf, long_msg_seq_no);
set_fragm_size(pbuf, fragm_sz);
set_expected_frags(pbuf, exp_fragm_cnt - 1);
} else {
pr_debug("Link unable to reassemble fragmented message\n");
kfree_skb(fbuf);
return -1;
}
kfree_skb(fbuf);
return 0;
} else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
u32 dsz = msg_data_sz(fragm);
u32 fsz = get_fragm_size(pbuf);
u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
u32 exp_frags = get_expected_frags(pbuf) - 1;
skb_copy_to_linear_data_offset(pbuf, crs,
msg_data(fragm), dsz);
kfree_skb(fbuf);
/* Is message complete? */
if (exp_frags == 0) {
if (prev)
prev->next = pbuf->next;
else
*pending = pbuf->next;
msg_reset_reroute_cnt(buf_msg(pbuf));
*fb = pbuf;
*m = buf_msg(pbuf);
return 1;
}
set_expected_frags(pbuf, exp_frags);
int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
struct sk_buff **fbuf)
{
struct sk_buff *frag = *fbuf;
struct tipc_msg *msg = buf_msg(frag);
u32 fragid = msg_type(msg);
bool headstolen;
int delta;
skb_pull(frag, msg_hdr_sz(msg));
if (fragid == FIRST_FRAGMENT) {
if (*head || skb_unclone(frag, GFP_ATOMIC))
goto out_free;
*head = frag;
skb_frag_list_init(*head);
return 0;
} else if (skb_try_coalesce(*head, frag, &headstolen, &delta)) {
kfree_skb_partial(frag, headstolen);
} else {
if (!*head)
goto out_free;
if (!skb_has_frag_list(*head))
skb_shinfo(*head)->frag_list = frag;
else
(*tail)->next = frag;
*tail = frag;
(*head)->truesize += frag->truesize;
}
if (fragid == LAST_FRAGMENT) {
*fbuf = *head;
*tail = *head = NULL;
return LINK_REASM_COMPLETE;
}
kfree_skb(fbuf);
return 0;
out_free:
pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
kfree_skb(*fbuf);
return LINK_REASM_ERROR;
}
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
......
......@@ -40,6 +40,12 @@
#include "msg.h"
#include "node.h"
/*
* Link reassembly status codes
*/
#define LINK_REASM_ERROR -1
#define LINK_REASM_COMPLETE 1
/*
* Out-of-range value for link sequence numbers
*/
......@@ -134,7 +140,8 @@ struct tipc_stats {
* @next_out: ptr to first unsent outbound message in queue
* @waiting_ports: linked list of ports waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages
* @defragm_buf: list of partially reassembled inbound message fragments
* @reasm_head: list head of partially reassembled inbound message fragments
* @reasm_tail: last fragment received
* @stats: collects statistics regarding link activity
*/
struct tipc_link {
......@@ -196,9 +203,10 @@ struct tipc_link {
struct sk_buff *next_out;
struct list_head waiting_ports;
/* Fragmentation/defragmentation */
/* Fragmentation/reassembly */
u32 long_msg_seq_no;
struct sk_buff *defragm_buf;
struct sk_buff *reasm_head;
struct sk_buff *reasm_tail;
/* Statistics */
struct tipc_stats stats;
......@@ -229,9 +237,9 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
struct iovec const *msg_sect,
unsigned int len, u32 destnode);
void tipc_link_recv_bundle(struct sk_buff *buf);
int tipc_link_recv_fragment(struct sk_buff **pending,
struct sk_buff **fb,
struct tipc_msg **msg);
int tipc_link_recv_fragment(struct sk_buff **reasm_head,
struct sk_buff **reasm_tail,
struct sk_buff **fbuf);
void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority,
u32 acked_mtu);
......
......@@ -554,12 +554,6 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
msg_set_bits(m, 4, 16, 0xffff, n);
}
static inline u32 msg_fragm_no(struct tipc_msg *m)
{
return msg_bits(m, 4, 16, 0xffff);
}
static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 4, 16, 0xffff, n);
......@@ -576,12 +570,6 @@ static inline void msg_set_next_sent(struct tipc_msg *m, u32 n)
msg_set_bits(m, 4, 0, 0xffff, n);
}
static inline u32 msg_long_msgno(struct tipc_msg *m)
{
return msg_bits(m, 4, 0, 0xffff);
}
static inline void msg_set_long_msgno(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 4, 0, 0xffff, n);
......
......@@ -298,9 +298,10 @@ static void node_lost_contact(struct tipc_node *n_ptr)
}
n_ptr->bclink.deferred_size = 0;
if (n_ptr->bclink.defragm) {
kfree_skb(n_ptr->bclink.defragm);
n_ptr->bclink.defragm = NULL;
if (n_ptr->bclink.reasm_head) {
kfree_skb(n_ptr->bclink.reasm_head);
n_ptr->bclink.reasm_head = NULL;
n_ptr->bclink.reasm_tail = NULL;
}
tipc_bclink_remove_node(n_ptr->addr);
......
......@@ -74,7 +74,8 @@
* @deferred_size: number of OOS b'cast messages in deferred queue
* @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node
* @defragm: list of partially reassembled b'cast message fragments from node
* @reasm_head: broadcast reassembly queue head from node
* @reasm_tail: last broadcast fragment received from node
* @recv_permitted: true if node is allowed to receive b'cast messages
*/
struct tipc_node {
......@@ -98,7 +99,8 @@ struct tipc_node {
u32 deferred_size;
struct sk_buff *deferred_head;
struct sk_buff *deferred_tail;
struct sk_buff *defragm;
struct sk_buff *reasm_head;
struct sk_buff *reasm_tail;
bool recv_permitted;
} bclink;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment