Commit 5d15af67 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-refactor-socket-receive-functions'

Jon Maloy says:

====================
tipc: refactor socket receive functions

We try to make the functions tipc_sk_recvmsg() and
tipc_sk_recvstream() more readable.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents b0e92279 ec8a09fb
...@@ -51,6 +51,7 @@ ...@@ -51,6 +51,7 @@
#define TIPC_FWD_MSG 1 #define TIPC_FWD_MSG 1
#define TIPC_MAX_PORT 0xffffffff #define TIPC_MAX_PORT 0xffffffff
#define TIPC_MIN_PORT 1 #define TIPC_MIN_PORT 1
#define TIPC_ACK_RATE 4 /* ACK at 1/4 of of rcv window size */
enum { enum {
TIPC_LISTEN = TCP_LISTEN, TIPC_LISTEN = TCP_LISTEN,
...@@ -1290,7 +1291,7 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) ...@@ -1290,7 +1291,7 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
/** /**
* tipc_recvmsg - receive packet-oriented message * tipc_recvmsg - receive packet-oriented message
* @m: descriptor for message info * @m: descriptor for message info
* @buf_len: total size of user buffer area * @buflen: length of user buffer area
* @flags: receive flags * @flags: receive flags
* *
* Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages. * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
...@@ -1298,95 +1299,85 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) ...@@ -1298,95 +1299,85 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
* *
* Returns size of returned message data, errno otherwise * Returns size of returned message data, errno otherwise
*/ */
static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len, static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
int flags) size_t buflen, int flags)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct sk_buff *buf; struct sk_buff *skb;
struct tipc_msg *msg; struct tipc_msg *hdr;
bool is_connectionless = tipc_sk_type_connectionless(sk); bool connected = !tipc_sk_type_connectionless(sk);
long timeo; int rc, err, hlen, dlen, copy;
unsigned int sz; long timeout;
u32 err;
int res, hlen;
/* Catch invalid receive requests */ /* Catch invalid receive requests */
if (unlikely(!buf_len)) if (unlikely(!buflen))
return -EINVAL; return -EINVAL;
lock_sock(sk); lock_sock(sk);
if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
if (!is_connectionless && unlikely(sk->sk_state == TIPC_OPEN)) { rc = -ENOTCONN;
res = -ENOTCONN;
goto exit; goto exit;
} }
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); do {
restart: /* Look at first msg in receive queue; wait if necessary */
rc = tipc_wait_for_rcvmsg(sock, &timeout);
/* Look for a message in receive queue; wait if necessary */ if (unlikely(rc))
res = tipc_wait_for_rcvmsg(sock, &timeo); goto exit;
if (res) skb = skb_peek(&sk->sk_receive_queue);
goto exit; hdr = buf_msg(skb);
dlen = msg_data_sz(hdr);
/* Look at first message in receive queue */ hlen = msg_hdr_sz(hdr);
buf = skb_peek(&sk->sk_receive_queue); err = msg_errcode(hdr);
msg = buf_msg(buf); if (likely(dlen || err))
sz = msg_data_sz(msg); break;
hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
if ((!sz) && (!err)) {
tsk_advance_rx_queue(sk); tsk_advance_rx_queue(sk);
goto restart; } while (1);
}
/* Capture sender's address (optional) */
set_orig_addr(m, msg);
/* Capture ancillary data (optional) */ /* Collect msg meta data, including error code and rejected data */
res = tipc_sk_anc_data_recv(m, msg, tsk); set_orig_addr(m, hdr);
if (res) rc = tipc_sk_anc_data_recv(m, hdr, tsk);
if (unlikely(rc))
goto exit; goto exit;
/* Capture message data (if valid) & compute return value (always) */ /* Capture data if non-error msg, otherwise just set return value */
if (!err) { if (likely(!err)) {
if (unlikely(buf_len < sz)) { copy = min_t(int, dlen, buflen);
sz = buf_len; if (unlikely(copy != dlen))
m->msg_flags |= MSG_TRUNC; m->msg_flags |= MSG_TRUNC;
} rc = skb_copy_datagram_msg(skb, hlen, m, copy);
res = skb_copy_datagram_msg(buf, hlen, m, sz);
if (res)
goto exit;
res = sz;
} else { } else {
if (is_connectionless || err == TIPC_CONN_SHUTDOWN || copy = 0;
m->msg_control) rc = 0;
res = 0; if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control)
else rc = -ECONNRESET;
res = -ECONNRESET;
} }
if (unlikely(rc))
goto exit;
/* Caption of data or error code/rejected data was successful */
if (unlikely(flags & MSG_PEEK)) if (unlikely(flags & MSG_PEEK))
goto exit; goto exit;
if (likely(!is_connectionless)) {
tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
tipc_sk_send_ack(tsk);
}
tsk_advance_rx_queue(sk); tsk_advance_rx_queue(sk);
if (likely(!connected))
goto exit;
/* Send connection flow control ack when applicable */
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
tipc_sk_send_ack(tsk);
exit: exit:
release_sock(sk); release_sock(sk);
return res; return rc ? rc : copy;
} }
/** /**
* tipc_recv_stream - receive stream-oriented data * tipc_recvstream - receive stream-oriented data
* @m: descriptor for message info * @m: descriptor for message info
* @buf_len: total size of user buffer area * @buflen: total size of user buffer area
* @flags: receive flags * @flags: receive flags
* *
* Used for SOCK_STREAM messages only. If not enough data is available * Used for SOCK_STREAM messages only. If not enough data is available
...@@ -1394,111 +1385,98 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len, ...@@ -1394,111 +1385,98 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
* *
* Returns size of returned message data, errno otherwise * Returns size of returned message data, errno otherwise
*/ */
static int tipc_recv_stream(struct socket *sock, struct msghdr *m, static int tipc_recvstream(struct socket *sock, struct msghdr *m,
size_t buf_len, int flags) size_t buflen, int flags)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct sk_buff *buf; struct sk_buff *skb;
struct tipc_msg *msg; struct tipc_msg *hdr;
long timeo; struct tipc_skb_cb *skb_cb;
unsigned int sz; bool peek = flags & MSG_PEEK;
int target; int offset, required, copy, copied = 0;
int sz_copied = 0; int hlen, dlen, err, rc;
u32 err; long timeout;
int res = 0, hlen;
/* Catch invalid receive attempts */ /* Catch invalid receive attempts */
if (unlikely(!buf_len)) if (unlikely(!buflen))
return -EINVAL; return -EINVAL;
lock_sock(sk); lock_sock(sk);
if (unlikely(sk->sk_state == TIPC_OPEN)) { if (unlikely(sk->sk_state == TIPC_OPEN)) {
res = -ENOTCONN; rc = -ENOTCONN;
goto exit;
}
target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
restart:
/* Look for a message in receive queue; wait if necessary */
res = tipc_wait_for_rcvmsg(sock, &timeo);
if (res)
goto exit; goto exit;
/* Look at first message in receive queue */
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
if ((!sz) && (!err)) {
tsk_advance_rx_queue(sk);
goto restart;
}
/* Optionally capture sender's address & ancillary data of first msg */
if (sz_copied == 0) {
set_orig_addr(m, msg);
res = tipc_sk_anc_data_recv(m, msg, tsk);
if (res)
goto exit;
} }
required = sock_rcvlowat(sk, flags & MSG_WAITALL, buflen);
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
/* Capture message data (if valid) & compute return value (always) */ do {
if (!err) { /* Look at first msg in receive queue; wait if necessary */
u32 offset = TIPC_SKB_CB(buf)->bytes_read; rc = tipc_wait_for_rcvmsg(sock, &timeout);
u32 needed; if (unlikely(rc))
int sz_to_copy; break;
skb = skb_peek(&sk->sk_receive_queue);
sz -= offset; skb_cb = TIPC_SKB_CB(skb);
needed = (buf_len - sz_copied); hdr = buf_msg(skb);
sz_to_copy = min(sz, needed); dlen = msg_data_sz(hdr);
hlen = msg_hdr_sz(hdr);
res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy); err = msg_errcode(hdr);
if (res)
goto exit; /* Discard any empty non-errored (SYN-) message */
if (unlikely(!dlen && !err)) {
tsk_advance_rx_queue(sk);
continue;
}
sz_copied += sz_to_copy; /* Collect msg meta data, incl. error code and rejected data */
if (!copied) {
set_orig_addr(m, hdr);
rc = tipc_sk_anc_data_recv(m, hdr, tsk);
if (rc)
break;
}
if (sz_to_copy < sz) { /* Copy data if msg ok, otherwise return error/partial data */
if (!(flags & MSG_PEEK)) if (likely(!err)) {
TIPC_SKB_CB(buf)->bytes_read = offset = skb_cb->bytes_read;
offset + sz_to_copy; copy = min_t(int, dlen - offset, buflen - copied);
goto exit; rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
if (unlikely(rc))
break;
copied += copy;
offset += copy;
if (unlikely(offset < dlen)) {
if (!peek)
skb_cb->bytes_read = offset;
break;
}
} else {
rc = 0;
if ((err != TIPC_CONN_SHUTDOWN) && !m->msg_control)
rc = -ECONNRESET;
if (copied || rc)
break;
} }
} else {
if (sz_copied != 0)
goto exit; /* can't add error msg to valid data */
if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control) if (unlikely(peek))
res = 0; break;
else
res = -ECONNRESET;
}
if (unlikely(flags & MSG_PEEK)) tsk_advance_rx_queue(sk);
goto exit;
tsk->rcv_unacked += tsk_inc(tsk, hlen + msg_data_sz(msg)); /* Send connection flow control advertisement when applicable */
if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4))) tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
tipc_sk_send_ack(tsk); if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
tsk_advance_rx_queue(sk); tipc_sk_send_ack(tsk);
/* Loop around if more data is required */ /* Exit if all requested data or FIN/error received */
if ((sz_copied < buf_len) && /* didn't get all requested data */ if (copied == buflen || err)
(!skb_queue_empty(&sk->sk_receive_queue) || break;
(sz_copied < target)) && /* and more is ready or required */
(!err)) /* and haven't reached a FIN */
goto restart;
} while (!skb_queue_empty(&sk->sk_receive_queue) || copied < required);
exit: exit:
release_sock(sk); release_sock(sk);
return sz_copied ? sz_copied : res; return copied ? copied : rc;
} }
/** /**
...@@ -2593,7 +2571,7 @@ static const struct proto_ops stream_ops = { ...@@ -2593,7 +2571,7 @@ static const struct proto_ops stream_ops = {
.setsockopt = tipc_setsockopt, .setsockopt = tipc_setsockopt,
.getsockopt = tipc_getsockopt, .getsockopt = tipc_getsockopt,
.sendmsg = tipc_sendstream, .sendmsg = tipc_sendstream,
.recvmsg = tipc_recv_stream, .recvmsg = tipc_recvstream,
.mmap = sock_no_mmap, .mmap = sock_no_mmap,
.sendpage = sock_no_sendpage .sendpage = sock_no_sendpage
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment