Commit ab91d13d authored by Trond Myklebust's avatar Trond Myklebust

RPC: Ensure that we disconnect TCP sockets if there has been no

NFS traffic for the last 5 minutes. This code also affects
NFSv2/v3.
parent 2642498f
...@@ -163,6 +163,12 @@ struct rpc_xprt { ...@@ -163,6 +163,12 @@ struct rpc_xprt {
tcp_offset; /* fragment offset */ tcp_offset; /* fragment offset */
unsigned long tcp_copied, /* copied to request */ unsigned long tcp_copied, /* copied to request */
tcp_flags; tcp_flags;
/*
* Disconnection of idle sockets
*/
struct work_struct task_cleanup;
struct timer_list timer;
unsigned long last_used;
/* /*
* Send stuff * Send stuff
...@@ -202,6 +208,7 @@ int xprt_clear_backlog(struct rpc_xprt *); ...@@ -202,6 +208,7 @@ int xprt_clear_backlog(struct rpc_xprt *);
void xprt_sock_setbufsize(struct rpc_xprt *); void xprt_sock_setbufsize(struct rpc_xprt *);
#define XPRT_CONNECT 0 #define XPRT_CONNECT 0
#define XPRT_LOCKED 1
#define xprt_connected(xp) (test_bit(XPRT_CONNECT, &(xp)->sockstate)) #define xprt_connected(xp) (test_bit(XPRT_CONNECT, &(xp)->sockstate))
#define xprt_set_connected(xp) (set_bit(XPRT_CONNECT, &(xp)->sockstate)) #define xprt_set_connected(xp) (set_bit(XPRT_CONNECT, &(xp)->sockstate))
......
...@@ -59,6 +59,7 @@ ...@@ -59,6 +59,7 @@
#include <linux/unistd.h> #include <linux/unistd.h>
#include <linux/sunrpc/clnt.h> #include <linux/sunrpc/clnt.h>
#include <linux/file.h> #include <linux/file.h>
#include <linux/workqueue.h>
#include <net/sock.h> #include <net/sock.h>
#include <net/checksum.h> #include <net/checksum.h>
...@@ -75,6 +76,7 @@ ...@@ -75,6 +76,7 @@
#endif #endif
#define XPRT_MAX_BACKOFF (8) #define XPRT_MAX_BACKOFF (8)
#define XPRT_IDLE_TIMEOUT (5*60*HZ)
/* /*
* Local functions * Local functions
...@@ -139,25 +141,33 @@ __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -139,25 +141,33 @@ __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
if (!xprt->snd_task) { if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) {
if (task == xprt->snd_task)
return 1;
if (task == NULL)
return 0;
goto out_sleep;
}
if (xprt->nocong || __xprt_get_cong(xprt, task)) { if (xprt->nocong || __xprt_get_cong(xprt, task)) {
xprt->snd_task = task; xprt->snd_task = task;
if (req) { if (req) {
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
req->rq_ntrans++; req->rq_ntrans++;
} }
return 1;
} }
} smp_mb__before_clear_bit();
if (xprt->snd_task != task) { clear_bit(XPRT_LOCKED, &xprt->sockstate);
dprintk("RPC: %4d TCP write queue full\n", task->tk_pid); smp_mb__after_clear_bit();
out_sleep:
dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt);
task->tk_timeout = 0; task->tk_timeout = 0;
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
if (req && req->rq_ntrans) if (req && req->rq_ntrans)
rpc_sleep_on(&xprt->resend, task, NULL, NULL); rpc_sleep_on(&xprt->resend, task, NULL, NULL);
else else
rpc_sleep_on(&xprt->sending, task, NULL, NULL); rpc_sleep_on(&xprt->sending, task, NULL, NULL);
} return 0;
return xprt->snd_task == task;
} }
static inline int static inline int
...@@ -177,15 +187,15 @@ __xprt_lock_write_next(struct rpc_xprt *xprt) ...@@ -177,15 +187,15 @@ __xprt_lock_write_next(struct rpc_xprt *xprt)
{ {
struct rpc_task *task; struct rpc_task *task;
if (xprt->snd_task) if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
return; return;
if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
goto out_unlock;
task = rpc_wake_up_next(&xprt->resend); task = rpc_wake_up_next(&xprt->resend);
if (!task) { if (!task) {
if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
return;
task = rpc_wake_up_next(&xprt->sending); task = rpc_wake_up_next(&xprt->sending);
if (!task) if (!task)
return; goto out_unlock;
} }
if (xprt->nocong || __xprt_get_cong(xprt, task)) { if (xprt->nocong || __xprt_get_cong(xprt, task)) {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
...@@ -194,7 +204,12 @@ __xprt_lock_write_next(struct rpc_xprt *xprt) ...@@ -194,7 +204,12 @@ __xprt_lock_write_next(struct rpc_xprt *xprt)
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
req->rq_ntrans++; req->rq_ntrans++;
} }
return;
} }
out_unlock:
smp_mb__before_clear_bit();
clear_bit(XPRT_LOCKED, &xprt->sockstate);
smp_mb__after_clear_bit();
} }
/* /*
...@@ -203,9 +218,13 @@ __xprt_lock_write_next(struct rpc_xprt *xprt) ...@@ -203,9 +218,13 @@ __xprt_lock_write_next(struct rpc_xprt *xprt)
static void static void
__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
if (xprt->snd_task == task) if (xprt->snd_task == task) {
xprt->snd_task = NULL; xprt->snd_task = NULL;
smp_mb__before_clear_bit();
clear_bit(XPRT_LOCKED, &xprt->sockstate);
smp_mb__after_clear_bit();
__xprt_lock_write_next(xprt); __xprt_lock_write_next(xprt);
}
} }
static inline void static inline void
...@@ -393,6 +412,15 @@ xprt_close(struct rpc_xprt *xprt) ...@@ -393,6 +412,15 @@ xprt_close(struct rpc_xprt *xprt)
sock_release(sock); sock_release(sock);
} }
static void
xprt_socket_autoclose(void *args)
{
struct rpc_xprt *xprt = (struct rpc_xprt *)args;
xprt_close(xprt);
xprt_release_write(xprt, NULL);
}
/* /*
* Mark a transport as disconnected * Mark a transport as disconnected
*/ */
...@@ -406,6 +434,27 @@ xprt_disconnect(struct rpc_xprt *xprt) ...@@ -406,6 +434,27 @@ xprt_disconnect(struct rpc_xprt *xprt)
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
} }
/*
* Used to allow disconnection when we've been idle
*/
static void
xprt_init_autodisconnect(unsigned long data)
{
struct rpc_xprt *xprt = (struct rpc_xprt *)data;
spin_lock(&xprt->sock_lock);
if (!list_empty(&xprt->recv) || xprt->shutdown)
goto out_abort;
if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate))
goto out_abort;
spin_unlock(&xprt->sock_lock);
/* Let keventd close the socket */
schedule_work(&xprt->task_cleanup);
return;
out_abort:
spin_unlock(&xprt->sock_lock);
}
/* /*
* Attempt to connect a TCP socket. * Attempt to connect a TCP socket.
* *
...@@ -1254,6 +1303,8 @@ xprt_reserve(struct rpc_task *task) ...@@ -1254,6 +1303,8 @@ xprt_reserve(struct rpc_task *task)
spin_lock(&xprt->xprt_lock); spin_lock(&xprt->xprt_lock);
do_xprt_reserve(task); do_xprt_reserve(task);
spin_unlock(&xprt->xprt_lock); spin_unlock(&xprt->xprt_lock);
if (task->tk_rqstp)
del_timer_sync(&xprt->timer);
} }
} }
...@@ -1333,6 +1384,9 @@ xprt_release(struct rpc_task *task) ...@@ -1333,6 +1384,9 @@ xprt_release(struct rpc_task *task)
__xprt_put_cong(xprt, req); __xprt_put_cong(xprt, req);
if (!list_empty(&req->rq_list)) if (!list_empty(&req->rq_list))
list_del(&req->rq_list); list_del(&req->rq_list);
xprt->last_used = jiffies;
if (list_empty(&xprt->recv) && !xprt->shutdown)
mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT);
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
task->tk_rqstp = NULL; task->tk_rqstp = NULL;
memset(req, 0, sizeof(*req)); /* mark unused */ memset(req, 0, sizeof(*req)); /* mark unused */
...@@ -1403,6 +1457,11 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) ...@@ -1403,6 +1457,11 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
init_waitqueue_head(&xprt->cong_wait); init_waitqueue_head(&xprt->cong_wait);
INIT_LIST_HEAD(&xprt->recv); INIT_LIST_HEAD(&xprt->recv);
INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt);
init_timer(&xprt->timer);
xprt->timer.function = xprt_init_autodisconnect;
xprt->timer.data = (unsigned long) xprt;
xprt->last_used = jiffies;
/* Set timeout parameters */ /* Set timeout parameters */
if (to) { if (to) {
...@@ -1583,6 +1642,7 @@ xprt_shutdown(struct rpc_xprt *xprt) ...@@ -1583,6 +1642,7 @@ xprt_shutdown(struct rpc_xprt *xprt)
rpc_wake_up(&xprt->backlog); rpc_wake_up(&xprt->backlog);
if (waitqueue_active(&xprt->cong_wait)) if (waitqueue_active(&xprt->cong_wait))
wake_up(&xprt->cong_wait); wake_up(&xprt->cong_wait);
del_timer_sync(&xprt->timer);
} }
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment