Commit 1186adf7 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: simplify message forwarding and rejection in socket layer

Despite recent improvements, the handling of error codes and return
values at reception of messages in the socket layer is still confusing.

In this commit, we try to make it more comprehensible. First, we
separate between the return values coming from the functions called
by tipc_sk_rcv(), -those are TIPC specific error codes, and the
return values returned by tipc_sk_rcv() itself. Second, we don't
use the returned TIPC error code as indication for whether a buffer
should be forwarded/rejected or not; instead we use the buffer pointer
passed along with filter_msg(). This separation is necessary because
we sometimes want to forward messages even when there is no error
(i.e., protocol messages and successfully secondary looked up data
messages).
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent c5898636
...@@ -818,17 +818,14 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf) ...@@ -818,17 +818,14 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
/** /**
* tipc_sk_proto_rcv - receive a connection mng protocol message * tipc_sk_proto_rcv - receive a connection mng protocol message
* @tsk: receiving socket * @tsk: receiving socket
* @dnode: node to send response message to, if any * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
* @buf: buffer containing protocol message
* Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
* (CONN_PROBE_REPLY) message should be forwarded.
*/ */
static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
struct sk_buff *buf)
{ {
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(*skb);
int conn_cong; int conn_cong;
u32 dnode;
u32 own_node = tsk_own_node(tsk);
/* Ignore if connection cannot be validated: */ /* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, msg)) if (!tsk_peer_msg(tsk, msg))
goto exit; goto exit;
...@@ -841,15 +838,15 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, ...@@ -841,15 +838,15 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
if (conn_cong) if (conn_cong)
tsk->sk.sk_write_space(&tsk->sk); tsk->sk.sk_write_space(&tsk->sk);
} else if (msg_type(msg) == CONN_PROBE) { } else if (msg_type(msg) == CONN_PROBE) {
if (!tipc_msg_reverse(tsk_own_node(tsk), buf, dnode, TIPC_OK)) if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
return TIPC_OK;
msg_set_type(msg, CONN_PROBE_REPLY); msg_set_type(msg, CONN_PROBE_REPLY);
return TIPC_FWD_MSG; return;
}
} }
/* Do nothing if msg_type() == CONN_PROBE_REPLY */ /* Do nothing if msg_type() == CONN_PROBE_REPLY */
exit: exit:
kfree_skb(buf); kfree_skb(*skb);
return TIPC_OK; *skb = NULL;
} }
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
...@@ -1568,16 +1565,16 @@ static void tipc_data_ready(struct sock *sk) ...@@ -1568,16 +1565,16 @@ static void tipc_data_ready(struct sock *sk)
/** /**
* filter_connect - Handle all incoming messages for a connection-based socket * filter_connect - Handle all incoming messages for a connection-based socket
* @tsk: TIPC socket * @tsk: TIPC socket
* @msg: message * @skb: pointer to message buffer. Set to NULL if buffer is consumed
* *
* Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
*/ */
static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
{ {
struct sock *sk = &tsk->sk; struct sock *sk = &tsk->sk;
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
struct socket *sock = sk->sk_socket; struct socket *sock = sk->sk_socket;
struct tipc_msg *msg = buf_msg(*buf); struct tipc_msg *msg = buf_msg(*skb);
int retval = -TIPC_ERR_NO_PORT; int retval = -TIPC_ERR_NO_PORT;
if (msg_mcast(msg)) if (msg_mcast(msg))
...@@ -1627,8 +1624,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) ...@@ -1627,8 +1624,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
* connect() routine if sleeping. * connect() routine if sleeping.
*/ */
if (msg_data_sz(msg) == 0) { if (msg_data_sz(msg) == 0) {
kfree_skb(*buf); kfree_skb(*skb);
*buf = NULL; *skb = NULL;
if (waitqueue_active(sk_sleep(sk))) if (waitqueue_active(sk_sleep(sk)))
wake_up_interruptible(sk_sleep(sk)); wake_up_interruptible(sk_sleep(sk));
} }
...@@ -1680,32 +1677,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) ...@@ -1680,32 +1677,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
/** /**
* filter_rcv - validate incoming message * filter_rcv - validate incoming message
* @sk: socket * @sk: socket
* @buf: message * @skb: pointer to message. Set to NULL if buffer is consumed.
* *
* Enqueues message on receive queue if acceptable; optionally handles * Enqueues message on receive queue if acceptable; optionally handles
* disconnect indication for a connected socket. * disconnect indication for a connected socket.
* *
* Called with socket lock already taken; port lock may also be taken. * Called with socket lock already taken
* *
* Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
* to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
*/ */
static int filter_rcv(struct sock *sk, struct sk_buff *buf) static int filter_rcv(struct sock *sk, struct sk_buff **skb)
{ {
struct socket *sock = sk->sk_socket; struct socket *sock = sk->sk_socket;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *msg = buf_msg(buf); struct tipc_msg *msg = buf_msg(*skb);
unsigned int limit = rcvbuf_limit(sk, buf); unsigned int limit = rcvbuf_limit(sk, *skb);
u32 onode;
int rc = TIPC_OK; int rc = TIPC_OK;
if (unlikely(msg_user(msg) == CONN_MANAGER)) if (unlikely(msg_user(msg) == CONN_MANAGER)) {
return tipc_sk_proto_rcv(tsk, &onode, buf); tipc_sk_proto_rcv(tsk, skb);
return TIPC_OK;
}
if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
kfree_skb(buf); kfree_skb(*skb);
tsk->link_cong = 0; tsk->link_cong = 0;
sk->sk_write_space(sk); sk->sk_write_space(sk);
*skb = NULL;
return TIPC_OK; return TIPC_OK;
} }
...@@ -1717,21 +1715,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf) ...@@ -1717,21 +1715,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
if (msg_connected(msg)) if (msg_connected(msg))
return -TIPC_ERR_NO_PORT; return -TIPC_ERR_NO_PORT;
} else { } else {
rc = filter_connect(tsk, &buf); rc = filter_connect(tsk, skb);
if (rc != TIPC_OK || buf == NULL) if (rc != TIPC_OK || !*skb)
return rc; return rc;
} }
/* Reject message if there isn't room to queue it */ /* Reject message if there isn't room to queue it */
if (sk_rmem_alloc_get(sk) + buf->truesize >= limit) if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
return -TIPC_ERR_OVERLOAD; return -TIPC_ERR_OVERLOAD;
/* Enqueue message */ /* Enqueue message */
TIPC_SKB_CB(buf)->handle = NULL; TIPC_SKB_CB(*skb)->handle = NULL;
__skb_queue_tail(&sk->sk_receive_queue, buf); __skb_queue_tail(&sk->sk_receive_queue, *skb);
skb_set_owner_r(buf, sk); skb_set_owner_r(*skb, sk);
sk->sk_data_ready(sk); sk->sk_data_ready(sk);
*skb = NULL;
return TIPC_OK; return TIPC_OK;
} }
...@@ -1746,25 +1745,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf) ...@@ -1746,25 +1745,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
*/ */
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{ {
int rc; int err;
u32 onode; atomic_t *dcnt;
u32 dnode;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
uint truesize = skb->truesize; uint truesize = skb->truesize;
rc = filter_rcv(sk, skb); err = filter_rcv(sk, &skb);
if (likely(!skb)) {
if (likely(!rc)) { dcnt = &tsk->dupl_rcvcnt;
if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
atomic_add(truesize, &tsk->dupl_rcvcnt); atomic_add(truesize, dcnt);
return 0; return 0;
} }
if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
if ((rc < 0) && !tipc_msg_reverse(tsk_own_node(tsk), skb, &onode, -rc)) tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
return 0;
tipc_link_xmit_skb(net, skb, onode, 0);
return 0; return 0;
} }
...@@ -1780,14 +1776,14 @@ int tipc_sk_rcv(struct net *net, struct sk_buff *skb) ...@@ -1780,14 +1776,14 @@ int tipc_sk_rcv(struct net *net, struct sk_buff *skb)
struct tipc_net *tn; struct tipc_net *tn;
struct sock *sk; struct sock *sk;
u32 dport = msg_destport(buf_msg(skb)); u32 dport = msg_destport(buf_msg(skb));
int rc = TIPC_OK; int err = TIPC_OK;
uint limit; uint limit;
u32 dnode; u32 dnode;
/* Validate destination and message */ /* Validate destination and message */
tsk = tipc_sk_lookup(net, dport); tsk = tipc_sk_lookup(net, dport);
if (unlikely(!tsk)) { if (unlikely(!tsk)) {
rc = tipc_msg_eval(net, skb, &dnode); err = tipc_msg_eval(net, skb, &dnode);
goto exit; goto exit;
} }
sk = &tsk->sk; sk = &tsk->sk;
...@@ -1796,25 +1792,25 @@ int tipc_sk_rcv(struct net *net, struct sk_buff *skb) ...@@ -1796,25 +1792,25 @@ int tipc_sk_rcv(struct net *net, struct sk_buff *skb)
spin_lock_bh(&sk->sk_lock.slock); spin_lock_bh(&sk->sk_lock.slock);
if (!sock_owned_by_user(sk)) { if (!sock_owned_by_user(sk)) {
rc = filter_rcv(sk, skb); err = filter_rcv(sk, &skb);
} else { } else {
if (sk->sk_backlog.len == 0) if (sk->sk_backlog.len == 0)
atomic_set(&tsk->dupl_rcvcnt, 0); atomic_set(&tsk->dupl_rcvcnt, 0);
limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt); limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
if (sk_add_backlog(sk, skb, limit)) if (likely(!sk_add_backlog(sk, skb, limit)))
rc = -TIPC_ERR_OVERLOAD; skb = NULL;
else
err = -TIPC_ERR_OVERLOAD;
} }
spin_unlock_bh(&sk->sk_lock.slock); spin_unlock_bh(&sk->sk_lock.slock);
sock_put(sk); sock_put(sk);
if (likely(!rc))
return 0;
exit: exit:
if (unlikely(skb)) {
tn = net_generic(net, tipc_net_id); tn = net_generic(net, tipc_net_id);
if ((rc < 0) && !tipc_msg_reverse(tn->own_addr, skb, &dnode, -rc)) if (!err || tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
return -EHOSTUNREACH;
tipc_link_xmit_skb(net, skb, dnode, 0); tipc_link_xmit_skb(net, skb, dnode, 0);
return (rc < 0) ? -EHOSTUNREACH : 0; }
return err ? -EHOSTUNREACH : 0;
} }
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment