Commit ba501666 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc_net-next_v2' of git://git.kernel.org/pub/scm/linux/kernel/git/paulg/linux

Paul Gortmaker says:

====================
Changes since v1:
	-get rid of essentially unused variable spotted by
	 Neil Horman (patch #2)

	-drop patch #3; defer it for 3.9 content, so Neil,
	 Jon and Ying can discuss its specifics at their
	 leisure while net-next is closed.  (It had no
	 direct dependencies to the rest of the series, and
	 was just an optimization)

	-fix indentation of accept() code directly in place
	 vs. forking it out to a separate function (was patch
	 #10, now patch #9).

Rebuilt and re-ran tests just to ensure nothing odd happened.

Original v1 text follows, updated pull information follows that.

           ---------

Here is another batch of TIPC changes.  The most interesting
thing is probably the non-blocking socket connect - I'm told
there were several users looking forward to seeing this.

Also there were some resource limitation changes that had
the right intent back in 2005, but were now apparently causing
needless limitations to people's real use cases; those have
been relaxed/removed.

There is a lockdep splat fix, but no need for a stable backport,
since it is virtually impossible to trigger in mainline; you
have to essentially modify code to force the probabilities
in your favour to see it.

The rest can largely be categorized as general cleanup of things
seen in the process of getting the above changes done.

Tested between 64 and 32 bit nodes with the test suite.  I've
also compile tested all the individual commits on the chain.

I'd originally figured on this queue not being ready for 3.8, but
the extended stabilization window of 3.7 has changed that.  On
the other hand, this can still be 3.9 material, if that simply
works better for folks - no problem for me to defer it to 2013.
If anyone spots any problems then I'll definitely defer it,
rather than rush a last minute respin.
===================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents c772dde3 0fef8f20
......@@ -97,7 +97,6 @@ static int link_send_sections_long(struct tipc_port *sender,
struct iovec const *msg_sect,
u32 num_sect, unsigned int total_len,
u32 destnode);
static void link_check_defragm_bufs(struct tipc_link *l_ptr);
static void link_state_event(struct tipc_link *l_ptr, u32 event);
static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
......@@ -271,7 +270,6 @@ static void link_timeout(struct tipc_link *l_ptr)
}
/* do all other link processing performed on a periodic basis */
link_check_defragm_bufs(l_ptr);
link_state_event(l_ptr, TIMEOUT_EVT);
......@@ -2497,16 +2495,6 @@ static void set_expected_frags(struct sk_buff *buf, u32 exp)
msg_set_bcast_ack(buf_msg(buf), exp);
}
static u32 get_timer_cnt(struct sk_buff *buf)
{
return msg_reroute_cnt(buf_msg(buf));
}
static void incr_timer_cnt(struct sk_buff *buf)
{
msg_incr_reroute_cnt(buf_msg(buf));
}
/*
* tipc_link_recv_fragment(): Called with node lock on. Returns
* the reassembled buffer if message is complete.
......@@ -2585,38 +2573,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
return 0;
}
/**
* link_check_defragm_bufs - flush stale incoming message fragments
* @l_ptr: pointer to link
*/
static void link_check_defragm_bufs(struct tipc_link *l_ptr)
{
struct sk_buff *prev = NULL;
struct sk_buff *next = NULL;
struct sk_buff *buf = l_ptr->defragm_buf;
if (!buf)
return;
if (!link_working_working(l_ptr))
return;
while (buf) {
u32 cnt = get_timer_cnt(buf);
next = buf->next;
if (cnt < 4) {
incr_timer_cnt(buf);
prev = buf;
} else {
if (prev)
prev->next = buf->next;
else
l_ptr->defragm_buf = buf->next;
kfree_skb(buf);
}
buf = next;
}
}
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
{
if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
......
......@@ -726,7 +726,7 @@ static void port_dispatcher_sigh(void *dummy)
if (unlikely(!cb))
goto reject;
if (unlikely(!connected)) {
if (tipc_connect2port(dref, &orig))
if (tipc_connect(dref, &orig))
goto reject;
} else if (peer_invalid)
goto reject;
......@@ -1036,15 +1036,30 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
return res;
}
int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
int tipc_connect(u32 ref, struct tipc_portid const *peer)
{
struct tipc_port *p_ptr;
struct tipc_msg *msg;
int res = -EINVAL;
int res;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
res = __tipc_connect(ref, p_ptr, peer);
tipc_port_unlock(p_ptr);
return res;
}
/*
* __tipc_connect - connect to a remote peer
*
* Port must be locked.
*/
int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
struct tipc_portid const *peer)
{
struct tipc_msg *msg;
int res = -EINVAL;
if (p_ptr->published || p_ptr->connected)
goto exit;
if (!peer->ref)
......@@ -1067,17 +1082,16 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
(net_ev_handler)port_handle_node_down);
res = 0;
exit:
tipc_port_unlock(p_ptr);
p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
return res;
}
/**
* tipc_disconnect_port - disconnect port from peer
/*
* __tipc_disconnect - disconnect port from peer
*
* Port must be locked.
*/
int tipc_disconnect_port(struct tipc_port *tp_ptr)
int __tipc_disconnect(struct tipc_port *tp_ptr)
{
int res;
......@@ -1104,7 +1118,7 @@ int tipc_disconnect(u32 ref)
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
res = tipc_disconnect_port(p_ptr);
res = __tipc_disconnect(p_ptr);
tipc_port_unlock(p_ptr);
return res;
}
......
......@@ -190,7 +190,7 @@ int tipc_publish(u32 portref, unsigned int scope,
int tipc_withdraw(u32 portref, unsigned int scope,
struct tipc_name_seq const *name_seq);
int tipc_connect2port(u32 portref, struct tipc_portid const *port);
int tipc_connect(u32 portref, struct tipc_portid const *port);
int tipc_disconnect(u32 portref);
......@@ -200,7 +200,9 @@ int tipc_shutdown(u32 ref);
/*
* The following routines require that the port be locked on entry
*/
int tipc_disconnect_port(struct tipc_port *tp_ptr);
int __tipc_disconnect(struct tipc_port *tp_ptr);
int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
struct tipc_portid const *peer);
int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
/*
......
/*
* net/tipc/socket.c: TIPC socket API
*
* Copyright (c) 2001-2007, Ericsson AB
* Copyright (c) 2004-2008, 2010-2011, Wind River Systems
* Copyright (c) 2001-2007, 2012 Ericsson AB
* Copyright (c) 2004-2008, 2010-2012, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
......@@ -43,7 +43,7 @@
#define SS_LISTENING -1 /* socket is listening */
#define SS_READY -2 /* socket is connectionless */
#define OVERLOAD_LIMIT_BASE 5000
#define OVERLOAD_LIMIT_BASE 10000
#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
struct tipc_sock {
......@@ -73,8 +73,6 @@ static struct proto tipc_proto;
static int sockets_enabled;
static atomic_t tipc_queue_size = ATOMIC_INIT(0);
/*
* Revised TIPC socket locking policy:
*
......@@ -128,7 +126,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0);
static void advance_rx_queue(struct sock *sk)
{
kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
atomic_dec(&tipc_queue_size);
}
/**
......@@ -140,10 +137,8 @@ static void discard_rx_queue(struct sock *sk)
{
struct sk_buff *buf;
while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
atomic_dec(&tipc_queue_size);
while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
kfree_skb(buf);
}
}
/**
......@@ -155,10 +150,8 @@ static void reject_rx_queue(struct sock *sk)
{
struct sk_buff *buf;
while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
atomic_dec(&tipc_queue_size);
}
}
/**
......@@ -280,7 +273,6 @@ static int release(struct socket *sock)
buf = __skb_dequeue(&sk->sk_receive_queue);
if (buf == NULL)
break;
atomic_dec(&tipc_queue_size);
if (TIPC_SKB_CB(buf)->handle != 0)
kfree_skb(buf);
else {
......@@ -783,16 +775,19 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
static int auto_connect(struct socket *sock, struct tipc_msg *msg)
{
struct tipc_sock *tsock = tipc_sk(sock->sk);
if (msg_errcode(msg)) {
sock->state = SS_DISCONNECTING;
return -ECONNREFUSED;
}
struct tipc_port *p_ptr;
tsock->peer_name.ref = msg_origport(msg);
tsock->peer_name.node = msg_orignode(msg);
tipc_connect2port(tsock->p->ref, &tsock->peer_name);
tipc_set_portimportance(tsock->p->ref, msg_importance(msg));
p_ptr = tipc_port_deref(tsock->p->ref);
if (!p_ptr)
return -EINVAL;
__tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name);
if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
return -EINVAL;
msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg));
sock->state = SS_CONNECTED;
return 0;
}
......@@ -951,13 +946,6 @@ static int recv_msg(struct kiocb *iocb, struct socket *sock,
sz = msg_data_sz(msg);
err = msg_errcode(msg);
/* Complete connection setup for an implied connect */
if (unlikely(sock->state == SS_CONNECTING)) {
res = auto_connect(sock, msg);
if (res)
goto exit;
}
/* Discard an empty non-errored message & try again */
if ((!sz) && (!err)) {
advance_rx_queue(sk);
......@@ -1194,6 +1182,83 @@ static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
return queue_size >= threshold;
}
/**
* filter_connect - Handle all incoming messages for a connection-based socket
* @tsock: TIPC socket
* @msg: message
*
* Returns TIPC error status code and socket error status code
* once it encounters some errors
*/
static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
{
struct socket *sock = tsock->sk.sk_socket;
struct tipc_msg *msg = buf_msg(*buf);
struct sock *sk = &tsock->sk;
u32 retval = TIPC_ERR_NO_PORT;
int res;
if (msg_mcast(msg))
return retval;
switch ((int)sock->state) {
case SS_CONNECTED:
/* Accept only connection-based messages sent by peer */
if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) {
if (unlikely(msg_errcode(msg))) {
sock->state = SS_DISCONNECTING;
__tipc_disconnect(tsock->p);
}
retval = TIPC_OK;
}
break;
case SS_CONNECTING:
/* Accept only ACK or NACK message */
if (unlikely(msg_errcode(msg))) {
sock->state = SS_DISCONNECTING;
sk->sk_err = -ECONNREFUSED;
retval = TIPC_OK;
break;
}
if (unlikely(!msg_connected(msg)))
break;
res = auto_connect(sock, msg);
if (res) {
sock->state = SS_DISCONNECTING;
sk->sk_err = res;
retval = TIPC_OK;
break;
}
/* If an incoming message is an 'ACK-', it should be
* discarded here because it doesn't contain useful
* data. In addition, we should try to wake up
* connect() routine if sleeping.
*/
if (msg_data_sz(msg) == 0) {
kfree_skb(*buf);
*buf = NULL;
if (waitqueue_active(sk_sleep(sk)))
wake_up_interruptible(sk_sleep(sk));
}
retval = TIPC_OK;
break;
case SS_LISTENING:
case SS_UNCONNECTED:
/* Accept only SYN message */
if (!msg_connected(msg) && !(msg_errcode(msg)))
retval = TIPC_OK;
break;
case SS_DISCONNECTING:
break;
default:
pr_err("Unknown socket state %u\n", sock->state);
}
return retval;
}
/**
* filter_rcv - validate incoming message
* @sk: socket
......@@ -1211,6 +1276,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
struct socket *sock = sk->sk_socket;
struct tipc_msg *msg = buf_msg(buf);
u32 recv_q_len;
u32 res = TIPC_OK;
/* Reject message if it is wrong sort of message for socket */
if (msg_type(msg) > TIPC_DIRECT_MSG)
......@@ -1220,32 +1286,12 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
if (msg_connected(msg))
return TIPC_ERR_NO_PORT;
} else {
if (msg_mcast(msg))
return TIPC_ERR_NO_PORT;
if (sock->state == SS_CONNECTED) {
if (!msg_connected(msg) ||
!tipc_port_peer_msg(tipc_sk_port(sk), msg))
return TIPC_ERR_NO_PORT;
} else if (sock->state == SS_CONNECTING) {
if (!msg_connected(msg) && (msg_errcode(msg) == 0))
return TIPC_ERR_NO_PORT;
} else if (sock->state == SS_LISTENING) {
if (msg_connected(msg) || msg_errcode(msg))
return TIPC_ERR_NO_PORT;
} else if (sock->state == SS_DISCONNECTING) {
return TIPC_ERR_NO_PORT;
} else /* (sock->state == SS_UNCONNECTED) */ {
if (msg_connected(msg) || msg_errcode(msg))
return TIPC_ERR_NO_PORT;
}
res = filter_connect(tipc_sk(sk), &buf);
if (res != TIPC_OK || buf == NULL)
return res;
}
/* Reject message if there isn't room to queue it */
recv_q_len = (u32)atomic_read(&tipc_queue_size);
if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
return TIPC_ERR_OVERLOAD;
}
recv_q_len = skb_queue_len(&sk->sk_receive_queue);
if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
......@@ -1254,15 +1300,8 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
/* Enqueue message (finally!) */
TIPC_SKB_CB(buf)->handle = 0;
atomic_inc(&tipc_queue_size);
__skb_queue_tail(&sk->sk_receive_queue, buf);
/* Initiate connection termination for an incoming 'FIN' */
if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
sock->state = SS_DISCONNECTING;
tipc_disconnect_port(tipc_sk_port(sk));
}
sk->sk_data_ready(sk, 0);
return TIPC_OK;
}
......@@ -1348,8 +1387,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
struct sock *sk = sock->sk;
struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
struct msghdr m = {NULL,};
struct sk_buff *buf;
struct tipc_msg *msg;
unsigned int timeout;
int res;
......@@ -1361,26 +1398,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
goto exit;
}
/* For now, TIPC does not support the non-blocking form of connect() */
if (flags & O_NONBLOCK) {
res = -EOPNOTSUPP;
goto exit;
}
/* Issue Posix-compliant error code if socket is in the wrong state */
if (sock->state == SS_LISTENING) {
res = -EOPNOTSUPP;
goto exit;
}
if (sock->state == SS_CONNECTING) {
res = -EALREADY;
goto exit;
}
if (sock->state != SS_UNCONNECTED) {
res = -EISCONN;
goto exit;
}
/*
* Reject connection attempt using multicast address
*
......@@ -1392,49 +1409,66 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
goto exit;
}
/* Reject any messages already in receive queue (very unlikely) */
reject_rx_queue(sk);
timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
switch (sock->state) {
case SS_UNCONNECTED:
/* Send a 'SYN-' to destination */
m.msg_name = dest;
m.msg_namelen = destlen;
/* If connect is in non-blocking case, set MSG_DONTWAIT to
* indicate send_msg() is never blocked.
*/
if (!timeout)
m.msg_flags = MSG_DONTWAIT;
res = send_msg(NULL, sock, &m, 0);
if (res < 0)
if ((res < 0) && (res != -EWOULDBLOCK))
goto exit;
/* Just entered SS_CONNECTING state; the only
* difference is that return value in non-blocking
* case is EINPROGRESS, rather than EALREADY.
*/
res = -EINPROGRESS;
break;
case SS_CONNECTING:
res = -EALREADY;
break;
case SS_CONNECTED:
res = -EISCONN;
break;
default:
res = -EINVAL;
goto exit;
}
if (sock->state == SS_CONNECTING) {
if (!timeout)
goto exit;
/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
timeout = tipc_sk(sk)->conn_timeout;
release_sock(sk);
res = wait_event_interruptible_timeout(*sk_sleep(sk),
(!skb_queue_empty(&sk->sk_receive_queue) ||
(sock->state != SS_CONNECTING)),
sock->state != SS_CONNECTING,
timeout ? (long)msecs_to_jiffies(timeout)
: MAX_SCHEDULE_TIMEOUT);
lock_sock(sk);
if (res > 0) {
buf = skb_peek(&sk->sk_receive_queue);
if (buf != NULL) {
msg = buf_msg(buf);
res = auto_connect(sock, msg);
if (!res) {
if (!msg_data_sz(msg))
advance_rx_queue(sk);
}
} else {
if (sock->state == SS_CONNECTED)
res = -EISCONN;
else
res = -ECONNREFUSED;
}
} else {
if (res <= 0) {
if (res == 0)
res = -ETIMEDOUT;
else
; /* leave "res" unchanged */
sock->state = SS_DISCONNECTING;
goto exit;
}
}
if (unlikely(sock->state == SS_DISCONNECTING))
res = sock_error(sk);
else
res = 0;
exit:
release_sock(sk);
return res;
......@@ -1475,8 +1509,13 @@ static int listen(struct socket *sock, int len)
*/
static int accept(struct socket *sock, struct socket *new_sock, int flags)
{
struct sock *sk = sock->sk;
struct sock *new_sk, *sk = sock->sk;
struct sk_buff *buf;
struct tipc_sock *new_tsock;
struct tipc_port *new_tport;
struct tipc_msg *msg;
u32 new_ref;
int res;
lock_sock(sk);
......@@ -1502,14 +1541,17 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
buf = skb_peek(&sk->sk_receive_queue);
res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
if (!res) {
struct sock *new_sk = new_sock->sk;
struct tipc_sock *new_tsock = tipc_sk(new_sk);
struct tipc_port *new_tport = new_tsock->p;
u32 new_ref = new_tport->ref;
struct tipc_msg *msg = buf_msg(buf);
if (res)
goto exit;
lock_sock(new_sk);
new_sk = new_sock->sk;
new_tsock = tipc_sk(new_sk);
new_tport = new_tsock->p;
new_ref = new_tport->ref;
msg = buf_msg(buf);
/* we lock on new_sk; but lockdep sees the lock on sk */
lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
/*
* Reject any stray messages received by new socket
......@@ -1520,7 +1562,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
/* Connect new socket to it's peer */
new_tsock->peer_name.ref = msg_origport(msg);
new_tsock->peer_name.node = msg_orignode(msg);
tipc_connect2port(new_ref, &new_tsock->peer_name);
tipc_connect(new_ref, &new_tsock->peer_name);
new_sock->state = SS_CONNECTED;
tipc_set_portimportance(new_ref, msg_importance(msg));
......@@ -1543,7 +1585,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
__skb_queue_head(&new_sk->sk_receive_queue, buf);
}
release_sock(new_sk);
}
exit:
release_sock(sk);
return res;
......@@ -1578,7 +1620,6 @@ static int shutdown(struct socket *sock, int how)
/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
buf = __skb_dequeue(&sk->sk_receive_queue);
if (buf) {
atomic_dec(&tipc_queue_size);
if (TIPC_SKB_CB(buf)->handle != 0) {
kfree_skb(buf);
goto restart;
......@@ -1717,7 +1758,7 @@ static int getsockopt(struct socket *sock,
/* no need to set "res", since already 0 at this point */
break;
case TIPC_NODE_RECVQ_DEPTH:
value = (u32)atomic_read(&tipc_queue_size);
value = 0; /* was tipc_queue_size, now obsolete */
break;
case TIPC_SOCK_RECVQ_DEPTH:
value = skb_queue_len(&sk->sk_receive_queue);
......
......@@ -462,7 +462,7 @@ static void subscr_named_msg_event(void *usr_handle,
kfree(subscriber);
return;
}
tipc_connect2port(subscriber->port_ref, orig);
tipc_connect(subscriber->port_ref, orig);
/* Lock server port (& save lock address for future use) */
subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment