Commit e387819a authored by Trond Myklebust's avatar Trond Myklebust Committed by Linus Torvalds

[PATCH] Do RPC over TCP reply message delivery in sock->data_ready()

xprt.c:
  Speed up synchronous RPC over TCP calls by having the
  replies delivered by the IPV4 "bottom half", instead of
  switching to the rpciod process in order to call recvmsg().
   - Remove sock_recvmsg() interface.
   - Remove rpc_xprt_pending list and rpciod_tcp_dispatcher() interface.
   - Use the new tcp_read_sock() interface to deliver data directly
     from within tcp_data_ready().
sched.c:
   - Remove references to rpciod_tcp_dispatcher.
xprt.h:
   - New set of flags to reflect the TCP record read state.

Cheers,
  Trond
parent d755a07e
......@@ -120,6 +120,11 @@ struct rpc_rqst {
#define rq_rnr rq_rcv_buf.io_nr
#define rq_rlen rq_rcv_buf.io_len
#define XPRT_LAST_FRAG (1 << 0)
#define XPRT_COPY_RECM (1 << 1)
#define XPRT_COPY_XID (1 << 2)
#define XPRT_COPY_DATA (1 << 3)
struct rpc_xprt {
struct socket * sock; /* BSD socket layer */
struct sock * inet; /* INET layer */
......@@ -140,18 +145,17 @@ struct rpc_xprt {
unsigned long sockstate; /* Socket state */
unsigned char shutdown : 1, /* being shut down */
nocong : 1, /* no congestion control */
stream : 1, /* TCP */
tcp_more : 1; /* more record fragments */
stream : 1; /* TCP */
/*
* State of TCP reply receive stuff
*/
u32 tcp_recm; /* Fragment header */
u32 tcp_xid; /* Current XID */
unsigned int tcp_reclen, /* fragment length */
tcp_offset, /* fragment offset */
tcp_copied; /* copied to request */
struct list_head rx_pending; /* receive pending list */
u32 tcp_recm, /* Fragment header */
tcp_xid, /* Current XID */
tcp_reclen, /* fragment length */
tcp_offset; /* fragment offset */
unsigned long tcp_copied, /* copied to request */
tcp_flags;
/*
* Send stuff
......@@ -185,8 +189,6 @@ int xprt_adjust_timeout(struct rpc_timeout *);
void xprt_release(struct rpc_task *);
void xprt_reconnect(struct rpc_task *);
int xprt_clear_backlog(struct rpc_xprt *);
int xprt_tcp_pending(void);
void __rpciod_tcp_dispatcher(void);
#define XPRT_WSPACE 0
#define XPRT_CONNECT 1
......@@ -200,13 +202,6 @@ void __rpciod_tcp_dispatcher(void);
#define xprt_test_and_set_connected(xp) (test_and_set_bit(XPRT_CONNECT, &(xp)->sockstate))
#define xprt_clear_connected(xp) (clear_bit(XPRT_CONNECT, &(xp)->sockstate))
static inline
void rpciod_tcp_dispatcher(void)
{
if (xprt_tcp_pending())
__rpciod_tcp_dispatcher();
}
#endif /* __KERNEL__*/
#endif /* _LINUX_SUNRPC_XPRT_H */
......@@ -704,9 +704,6 @@ __rpc_schedule(void)
dprintk("RPC: rpc_schedule enter\n");
while (1) {
/* Ensure equal rights for tcp tasks... */
rpciod_tcp_dispatcher();
spin_lock_bh(&rpc_queue_lock);
task_for_first(task, &schedq.tasks) {
......@@ -1030,7 +1027,7 @@ static DECLARE_MUTEX_LOCKED(rpciod_running);
static inline int
rpciod_task_pending(void)
{
return !list_empty(&schedq.tasks) || xprt_tcp_pending();
return !list_empty(&schedq.tasks);
}
......
......@@ -64,6 +64,7 @@
#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
#include <net/tcp.h>
#include <asm/uaccess.h>
......@@ -88,7 +89,6 @@ static void xprt_disconnect(struct rpc_xprt *);
static void xprt_reconn_status(struct rpc_task *task);
static struct socket *xprt_create_socket(int, struct rpc_timeout *);
static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
static void xprt_remove_pending(struct rpc_xprt *);
#ifdef RPC_DEBUG_DATA
/*
......@@ -269,43 +269,6 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
return result;
}
/*
* Read data from socket
*/
static int
xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
{
struct socket *sock = xprt->sock;
struct msghdr msg;
mm_segment_t oldfs;
struct iovec niv[MAX_IOVEC];
int result;
if (!sock)
return -ENOTCONN;
msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
msg.msg_iov = iov;
msg.msg_iovlen = nr;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_control = NULL;
msg.msg_controllen = 0;
/* Adjust the iovec if we've already filled it */
if (shift)
xprt_move_iov(&msg, niv, shift);
oldfs = get_fs(); set_fs(get_ds());
result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
set_fs(oldfs);
dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
iov, len, result);
return result;
}
/*
* Adjust RPC congestion window
* We use a time-smoothed congestion estimator to avoid heavy oscillation.
......@@ -423,7 +386,6 @@ xprt_disconnect(struct rpc_xprt *xprt)
{
dprintk("RPC: disconnected transport %p\n", xprt);
xprt_clear_connected(xprt);
xprt_remove_pending(xprt);
rpc_wake_up_status(&xprt->pending, -ENOTCONN);
}
......@@ -473,7 +435,7 @@ xprt_reconnect(struct rpc_task *task)
xprt->tcp_offset = 0;
xprt->tcp_reclen = 0;
xprt->tcp_copied = 0;
xprt->tcp_more = 0;
xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
/* Now connect it asynchronously. */
dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
......@@ -716,309 +678,229 @@ udp_data_ready(struct sock *sk, int len)
wake_up_interruptible(sk->sleep);
}
typedef struct {
struct sk_buff *skb;
unsigned offset;
size_t count;
} skb_reader_t;
/*
* TCP read fragment marker
* Copy from an skb into memory and shrink the skb.
*/
static inline int
tcp_read_fraghdr(struct rpc_xprt *xprt)
static inline size_t
tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
{
struct iovec riov;
int want, result;
if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
goto done;
want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
dprintk("RPC: reading header (%d bytes)\n", want);
do {
riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
riov.iov_len = want;
result = xprt_recvmsg(xprt, &riov, 1, want, 0);
if (result < 0)
return result;
xprt->tcp_offset += result;
want -= result;
} while (want);
if (len > desc->count)
len = desc->count;
skb_copy_bits(desc->skb, desc->offset, p, len);
desc->offset += len;
desc->count -= len;
return len;
}
/* Get the record length and mask out the last fragment bit */
/*
* TCP read fragment marker
*/
static inline void
tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
{
size_t len, used;
char *p;
p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
used = tcp_copy_data(desc, p, len);
xprt->tcp_offset += used;
if (used != len)
return;
xprt->tcp_reclen = ntohl(xprt->tcp_recm);
xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
if (xprt->tcp_reclen & 0x80000000)
xprt->tcp_flags |= XPRT_LAST_FRAG;
else
xprt->tcp_flags &= ~XPRT_LAST_FRAG;
xprt->tcp_reclen &= 0x7fffffff;
xprt->tcp_flags &= ~XPRT_COPY_RECM;
xprt->tcp_offset = 0;
/* Sanity check of the record length */
if (xprt->tcp_reclen < 4) {
printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
xprt_disconnect(xprt);
}
dprintk("RPC: reading TCP record fragment of length %d\n",
xprt->tcp_reclen);
}
dprintk("RPC: New record reclen %d morefrags %d\n",
xprt->tcp_reclen, xprt->tcp_more);
done:
return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
static void
tcp_check_recm(struct rpc_xprt *xprt)
{
if (xprt->tcp_offset == xprt->tcp_reclen) {
xprt->tcp_flags |= XPRT_COPY_RECM;
xprt->tcp_offset = 0;
if (xprt->tcp_flags & XPRT_LAST_FRAG) {
xprt->tcp_flags &= ~XPRT_COPY_DATA;
xprt->tcp_flags |= XPRT_COPY_XID;
xprt->tcp_copied = 0;
}
}
}
/*
* TCP read xid
*/
static inline int
tcp_read_xid(struct rpc_xprt *xprt, int avail)
static inline void
tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
{
struct iovec riov;
int want, result;
if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
goto done;
want = min_t(unsigned int, sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
do {
dprintk("RPC: reading xid (%d bytes)\n", want);
riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
riov.iov_len = want;
result = xprt_recvmsg(xprt, &riov, 1, want, 0);
if (result < 0)
return result;
xprt->tcp_copied += result;
xprt->tcp_offset += result;
want -= result;
avail -= result;
} while (want);
done:
return avail;
size_t len, used;
char *p;
len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
dprintk("RPC: reading XID (%Zu bytes)\n", len);
p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
used = tcp_copy_data(desc, p, len);
xprt->tcp_offset += used;
if (used != len)
return;
xprt->tcp_flags &= ~XPRT_COPY_XID;
xprt->tcp_flags |= XPRT_COPY_DATA;
xprt->tcp_copied = 4;
dprintk("RPC: reading reply for XID %08x\n", xprt->tcp_xid);
tcp_check_recm(xprt);
}
/*
* TCP read and complete request
*/
static inline int
tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
static inline void
tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
{
int want, result;
struct rpc_rqst *req;
struct iovec *iov;
char *p;
unsigned long skip;
size_t len, used;
int n;
if (req->rq_rlen <= xprt->tcp_copied || !avail)
goto done;
want = min_t(unsigned int, req->rq_rlen - xprt->tcp_copied, avail);
do {
dprintk("RPC: %4d TCP receiving %d bytes\n",
req->rq_task->tk_pid, want);
/* Find and lock the request corresponding to this xid */
req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
if (!req) {
xprt->tcp_flags &= ~XPRT_COPY_DATA;
dprintk("RPC: XID %08x request not found!\n",
xprt->tcp_xid);
return;
}
skip = xprt->tcp_copied;
iov = req->rq_rvec;
for (n = req->rq_rnr; n != 0; n--, iov++) {
if (skip >= iov->iov_len) {
skip -= iov->iov_len;
continue;
}
p = iov->iov_base;
len = iov->iov_len;
if (skip) {
p += skip;
len -= skip;
skip = 0;
}
if (xprt->tcp_offset + len > xprt->tcp_reclen)
len = xprt->tcp_reclen - xprt->tcp_offset;
used = tcp_copy_data(desc, p, len);
xprt->tcp_copied += used;
xprt->tcp_offset += used;
if (used != len)
break;
if (xprt->tcp_copied == req->rq_rlen) {
xprt->tcp_flags &= ~XPRT_COPY_DATA;
break;
}
if (xprt->tcp_offset == xprt->tcp_reclen) {
if (xprt->tcp_flags & XPRT_LAST_FRAG)
xprt->tcp_flags &= ~XPRT_COPY_DATA;
break;
}
}
result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
if (result < 0)
return result;
xprt->tcp_copied += result;
xprt->tcp_offset += result;
avail -= result;
want -= result;
} while (want);
done:
if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
return avail;
dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
dprintk("RPC: %4d received reply complete\n",
req->rq_task->tk_pid);
xprt_complete_rqst(xprt, req, xprt->tcp_copied);
return avail;
}
rpc_unlock_task(req->rq_task);
tcp_check_recm(xprt);
}
/*
* TCP discard extra bytes from a short read
*/
static inline int
tcp_read_discard(struct rpc_xprt *xprt, int avail)
static inline void
tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
{
struct iovec riov;
static u8 dummy[64];
int want, result = 0;
while (avail) {
want = min_t(unsigned int, avail, sizeof(dummy));
riov.iov_base = dummy;
riov.iov_len = want;
dprintk("RPC: TCP skipping %d bytes\n", want);
result = xprt_recvmsg(xprt, &riov, 1, want, 0);
if (result < 0)
return result;
xprt->tcp_offset += result;
avail -= result;
}
return avail;
size_t len;
len = xprt->tcp_reclen - xprt->tcp_offset;
if (len > desc->count)
len = desc->count;
desc->count -= len;
desc->offset += len;
xprt->tcp_offset += len;
tcp_check_recm(xprt);
}
/*
* TCP record receive routine
* This is not the most efficient code since we call recvfrom thrice--
* first receiving the record marker, then the XID, then the data.
*
* The optimal solution would be a RPC support in the TCP layer, which
* would gather all data up to the next record marker and then pass us
* the list of all TCP segments ready to be copied.
* We first have to grab the record marker, then the XID, then the data.
*/
static int
tcp_input_record(struct rpc_xprt *xprt)
tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
unsigned int offset, size_t len)
{
struct rpc_rqst *req = NULL;
struct rpc_task *task = NULL;
int avail, result;
dprintk("RPC: tcp_input_record\n");
if (xprt->shutdown)
return -EIO;
if (!xprt_connected(xprt))
return -ENOTCONN;
struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
skb_reader_t desc = { skb, offset, len };
dprintk("RPC: tcp_data_recv\n");
do {
/* Read in a new fragment marker if necessary */
/* Can we ever really expect to get completely empty fragments? */
if ((result = tcp_read_fraghdr(xprt)) < 0)
return result;
avail = result;
if (xprt->tcp_flags & XPRT_COPY_RECM) {
tcp_read_fraghdr(xprt, &desc);
continue;
}
/* Read in the xid if necessary */
if ((result = tcp_read_xid(xprt, avail)) < 0)
return result;
if (!(avail = result))
goto out_ok;
/* Find and lock the request corresponding to this xid */
req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
if (req) {
task = req->rq_task;
if (xprt->tcp_flags & XPRT_COPY_XID) {
tcp_read_xid(xprt, &desc);
continue;
}
/* Read in the request data */
result = tcp_read_request(xprt, req, avail);
rpc_unlock_task(task);
if (result < 0)
return result;
avail = result;
if (xprt->tcp_flags & XPRT_COPY_DATA) {
tcp_read_request(xprt, &desc);
continue;
}
/* Skip over any trailing bytes on short reads */
if ((result = tcp_read_discard(xprt, avail)) < 0)
return result;
out_ok:
dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n",
xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
result = xprt->tcp_reclen;
xprt->tcp_reclen = 0;
xprt->tcp_offset = 0;
if (!xprt->tcp_more)
xprt->tcp_copied = 0;
return result;
}
/*
* TCP task queue stuff
*/
LIST_HEAD(rpc_xprt_pending); /* List of xprts having pending tcp requests */
static inline
void tcp_rpciod_queue(void)
{
rpciod_wake_up();
}
int xprt_tcp_pending(void)
{
int retval;
spin_lock_bh(&rpc_queue_lock);
retval = !list_empty(&rpc_xprt_pending);
spin_unlock_bh(&rpc_queue_lock);
return retval;
}
static inline
void xprt_append_pending(struct rpc_xprt *xprt)
{
spin_lock_bh(&rpc_queue_lock);
if (list_empty(&xprt->rx_pending)) {
list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
dprintk("RPC: xprt queue %p\n", xprt);
tcp_rpciod_queue();
}
spin_unlock_bh(&rpc_queue_lock);
}
static
void xprt_remove_pending(struct rpc_xprt *xprt)
{
spin_lock_bh(&rpc_queue_lock);
if (!list_empty(&xprt->rx_pending)) {
list_del(&xprt->rx_pending);
INIT_LIST_HEAD(&xprt->rx_pending);
}
spin_unlock_bh(&rpc_queue_lock);
tcp_read_discard(xprt, &desc);
} while (desc.count && xprt_connected(xprt));
dprintk("RPC: tcp_data_recv done\n");
return len - desc.count;
}
static inline
struct rpc_xprt *xprt_remove_pending_next(void)
{
struct rpc_xprt *xprt = NULL;
spin_lock_bh(&rpc_queue_lock);
if (!list_empty(&rpc_xprt_pending)) {
xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
list_del(&xprt->rx_pending);
INIT_LIST_HEAD(&xprt->rx_pending);
}
spin_unlock_bh(&rpc_queue_lock);
return xprt;
}
/*
* This is protected from tcp_data_ready and the stack as its run
* inside of the RPC I/O daemon
*/
void
__rpciod_tcp_dispatcher(void)
{
struct rpc_xprt *xprt;
int safe_retry = 0, result;
dprintk("rpciod_tcp_dispatcher: Queue Running\n");
/*
* Empty each pending socket
*/
while ((xprt = xprt_remove_pending_next()) != NULL) {
dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
do {
result = tcp_input_record(xprt);
} while (result >= 0);
if (safe_retry++ > 200) {
schedule();
safe_retry = 0;
}
}
}
/*
* data_ready callback for TCP. We can't just jump into the
* tcp recvmsg functions inside of the network receive bh or
* bad things occur. We queue it to pick up after networking
* is done.
*/
static void tcp_data_ready(struct sock *sk, int len)
static void tcp_data_ready(struct sock *sk, int bytes)
{
struct rpc_xprt *xprt;
read_descriptor_t rd_desc;
dprintk("RPC: tcp_data_ready...\n");
if (!(xprt = xprt_from_sock(sk)))
{
printk("Not a socket with xprt %p\n", sk);
goto out;
if (!(xprt = xprt_from_sock(sk))) {
printk("RPC: tcp_data_ready socket info not found!\n");
return;
}
if (xprt->shutdown)
goto out;
xprt_append_pending(xprt);
return;
dprintk("RPC: tcp_data_ready client %p\n", xprt);
dprintk("RPC: state %x conn %d dead %d zapped %d\n",
sk->state, xprt_connected(xprt),
sk->dead, sk->zapped);
out:
if (sk->sleep && waitqueue_active(sk->sleep))
wake_up_interruptible(sk->sleep);
/* We use rd_desc to pass struct xprt to tcp_data_recv */
rd_desc.buf = (char *)xprt;
rd_desc.count = 65536;
tcp_read_sock(sk, &rd_desc, tcp_data_recv);
}
static void
tcp_state_change(struct sock *sk)
{
......@@ -1483,8 +1365,6 @@ xprt_setup(struct socket *sock, int proto,
req->rq_next = NULL;
xprt->free = xprt->slot;
INIT_LIST_HEAD(&xprt->rx_pending);
dprintk("RPC: created transport %p\n", xprt);
xprt_bind_socket(xprt, sock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment