Commit 7f94ed24 authored by Trond Myklebust's avatar Trond Myklebust

Merge branch 'sunrpc'

parents 149a4fdd ce272302
...@@ -230,6 +230,10 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *, ...@@ -230,6 +230,10 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *,
struct rpc_task *); struct rpc_task *);
void rpc_wake_up(struct rpc_wait_queue *); void rpc_wake_up(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *); struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *,
bool (*)(struct rpc_task *, void *),
void *);
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *, struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
bool (*)(struct rpc_task *, void *), bool (*)(struct rpc_task *, void *),
void *); void *);
...@@ -247,6 +251,7 @@ void rpc_show_tasks(struct net *); ...@@ -247,6 +251,7 @@ void rpc_show_tasks(struct net *);
int rpc_init_mempool(void); int rpc_init_mempool(void);
void rpc_destroy_mempool(void); void rpc_destroy_mempool(void);
extern struct workqueue_struct *rpciod_workqueue; extern struct workqueue_struct *rpciod_workqueue;
extern struct workqueue_struct *xprtiod_workqueue;
void rpc_prepare_task(struct rpc_task *task); void rpc_prepare_task(struct rpc_task *task);
static inline int rpc_wait_for_completion_task(struct rpc_task *task) static inline int rpc_wait_for_completion_task(struct rpc_task *task)
......
...@@ -80,6 +80,7 @@ struct sock_xprt { ...@@ -80,6 +80,7 @@ struct sock_xprt {
#define TCP_RPC_REPLY (1UL << 6) #define TCP_RPC_REPLY (1UL << 6)
#define XPRT_SOCK_CONNECTING 1U #define XPRT_SOCK_CONNECTING 1U
#define XPRT_SOCK_DATA_READY (2)
#endif /* __KERNEL__ */ #endif /* __KERNEL__ */
......
...@@ -2577,7 +2577,7 @@ static void rpc_cb_add_xprt_release(void *calldata) ...@@ -2577,7 +2577,7 @@ static void rpc_cb_add_xprt_release(void *calldata)
kfree(data); kfree(data);
} }
const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = { static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
.rpc_call_done = rpc_cb_add_xprt_done, .rpc_call_done = rpc_cb_add_xprt_done,
.rpc_release = rpc_cb_add_xprt_release, .rpc_release = rpc_cb_add_xprt_release,
}; };
......
...@@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue; ...@@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue;
/* /*
* rpciod-related stuff * rpciod-related stuff
*/ */
struct workqueue_struct *rpciod_workqueue; struct workqueue_struct *rpciod_workqueue __read_mostly;
struct workqueue_struct *xprtiod_workqueue __read_mostly;
/* /*
* Disable the timer for a given RPC task. Should be called with * Disable the timer for a given RPC task. Should be called with
...@@ -329,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); ...@@ -329,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
* lockless RPC_IS_QUEUED() test) before we've had a chance to test * lockless RPC_IS_QUEUED() test) before we've had a chance to test
* the RPC_TASK_RUNNING flag. * the RPC_TASK_RUNNING flag.
*/ */
static void rpc_make_runnable(struct rpc_task *task) static void rpc_make_runnable(struct workqueue_struct *wq,
struct rpc_task *task)
{ {
bool need_wakeup = !rpc_test_and_set_running(task); bool need_wakeup = !rpc_test_and_set_running(task);
...@@ -338,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task) ...@@ -338,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task)
return; return;
if (RPC_IS_ASYNC(task)) { if (RPC_IS_ASYNC(task)) {
INIT_WORK(&task->u.tk_work, rpc_async_schedule); INIT_WORK(&task->u.tk_work, rpc_async_schedule);
queue_work(rpciod_workqueue, &task->u.tk_work); queue_work(wq, &task->u.tk_work);
} else } else
wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
} }
...@@ -407,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task, ...@@ -407,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
EXPORT_SYMBOL_GPL(rpc_sleep_on_priority); EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
/** /**
* __rpc_do_wake_up_task - wake up a single rpc_task * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
* @wq: workqueue on which to run task
* @queue: wait queue * @queue: wait queue
* @task: task to be woken up * @task: task to be woken up
* *
* Caller must hold queue->lock, and have cleared the task queued flag. * Caller must hold queue->lock, and have cleared the task queued flag.
*/ */
static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task) static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
struct rpc_task *task)
{ {
dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
task->tk_pid, jiffies); task->tk_pid, jiffies);
...@@ -428,7 +433,7 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task ...@@ -428,7 +433,7 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
__rpc_remove_wait_queue(queue, task); __rpc_remove_wait_queue(queue, task);
rpc_make_runnable(task); rpc_make_runnable(wq, task);
dprintk("RPC: __rpc_wake_up_task done\n"); dprintk("RPC: __rpc_wake_up_task done\n");
} }
...@@ -436,15 +441,24 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task ...@@ -436,15 +441,24 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
/* /*
* Wake up a queued task while the queue lock is being held * Wake up a queued task while the queue lock is being held
*/ */
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
struct rpc_wait_queue *queue, struct rpc_task *task)
{ {
if (RPC_IS_QUEUED(task)) { if (RPC_IS_QUEUED(task)) {
smp_rmb(); smp_rmb();
if (task->tk_waitqueue == queue) if (task->tk_waitqueue == queue)
__rpc_do_wake_up_task(queue, task); __rpc_do_wake_up_task_on_wq(wq, queue, task);
} }
} }
/*
* Wake up a queued task while the queue lock is being held
*/
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
{
rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
}
/* /*
* Wake up a task on a specific queue * Wake up a task on a specific queue
*/ */
...@@ -518,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue) ...@@ -518,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
/* /*
* Wake up the first task on the wait queue. * Wake up the first task on the wait queue.
*/ */
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
bool (*func)(struct rpc_task *, void *), void *data) bool (*func)(struct rpc_task *, void *), void *data)
{ {
struct rpc_task *task = NULL; struct rpc_task *task = NULL;
...@@ -529,7 +544,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, ...@@ -529,7 +544,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
task = __rpc_find_next_queued(queue); task = __rpc_find_next_queued(queue);
if (task != NULL) { if (task != NULL) {
if (func(task, data)) if (func(task, data))
rpc_wake_up_task_queue_locked(queue, task); rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
else else
task = NULL; task = NULL;
} }
...@@ -537,6 +552,15 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, ...@@ -537,6 +552,15 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
return task; return task;
} }
/*
* Wake up the first task on the wait queue.
*/
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
bool (*func)(struct rpc_task *, void *), void *data)
{
return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_first); EXPORT_SYMBOL_GPL(rpc_wake_up_first);
static bool rpc_wake_up_next_func(struct rpc_task *task, void *data) static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
...@@ -814,7 +838,7 @@ void rpc_execute(struct rpc_task *task) ...@@ -814,7 +838,7 @@ void rpc_execute(struct rpc_task *task)
bool is_async = RPC_IS_ASYNC(task); bool is_async = RPC_IS_ASYNC(task);
rpc_set_active(task); rpc_set_active(task);
rpc_make_runnable(task); rpc_make_runnable(rpciod_workqueue, task);
if (!is_async) if (!is_async)
__rpc_execute(task); __rpc_execute(task);
} }
...@@ -1071,10 +1095,22 @@ static int rpciod_start(void) ...@@ -1071,10 +1095,22 @@ static int rpciod_start(void)
* Create the rpciod thread and wait for it to start. * Create the rpciod thread and wait for it to start.
*/ */
dprintk("RPC: creating workqueue rpciod\n"); dprintk("RPC: creating workqueue rpciod\n");
/* Note: highpri because network receive is latency sensitive */ wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); if (!wq)
goto out_failed;
rpciod_workqueue = wq; rpciod_workqueue = wq;
return rpciod_workqueue != NULL; /* Note: highpri because network receive is latency sensitive */
wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
if (!wq)
goto free_rpciod;
xprtiod_workqueue = wq;
return 1;
free_rpciod:
wq = rpciod_workqueue;
rpciod_workqueue = NULL;
destroy_workqueue(wq);
out_failed:
return 0;
} }
static void rpciod_stop(void) static void rpciod_stop(void)
...@@ -1088,6 +1124,9 @@ static void rpciod_stop(void) ...@@ -1088,6 +1124,9 @@ static void rpciod_stop(void)
wq = rpciod_workqueue; wq = rpciod_workqueue;
rpciod_workqueue = NULL; rpciod_workqueue = NULL;
destroy_workqueue(wq); destroy_workqueue(wq);
wq = xprtiod_workqueue;
xprtiod_workqueue = NULL;
destroy_workqueue(wq);
} }
void void
......
...@@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) ...@@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
clear_bit(XPRT_LOCKED, &xprt->state); clear_bit(XPRT_LOCKED, &xprt->state);
smp_mb__after_atomic(); smp_mb__after_atomic();
} else } else
queue_work(rpciod_workqueue, &xprt->task_cleanup); queue_work(xprtiod_workqueue, &xprt->task_cleanup);
} }
/* /*
...@@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt) ...@@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return; return;
if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_func, xprt))
return; return;
xprt_clear_locked(xprt); xprt_clear_locked(xprt);
} }
...@@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) ...@@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
return; return;
if (RPCXPRT_CONGESTED(xprt)) if (RPCXPRT_CONGESTED(xprt))
goto out_unlock; goto out_unlock;
if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt)) if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_cong_func, xprt))
return; return;
out_unlock: out_unlock:
xprt_clear_locked(xprt); xprt_clear_locked(xprt);
...@@ -645,7 +647,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) ...@@ -645,7 +647,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
set_bit(XPRT_CLOSE_WAIT, &xprt->state); set_bit(XPRT_CLOSE_WAIT, &xprt->state);
/* Try to schedule an autoclose RPC call */ /* Try to schedule an autoclose RPC call */
if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
queue_work(rpciod_workqueue, &xprt->task_cleanup); queue_work(xprtiod_workqueue, &xprt->task_cleanup);
xprt_wake_pending_tasks(xprt, -EAGAIN); xprt_wake_pending_tasks(xprt, -EAGAIN);
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
} }
...@@ -672,7 +674,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) ...@@ -672,7 +674,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
set_bit(XPRT_CLOSE_WAIT, &xprt->state); set_bit(XPRT_CLOSE_WAIT, &xprt->state);
/* Try to schedule an autoclose RPC call */ /* Try to schedule an autoclose RPC call */
if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
queue_work(rpciod_workqueue, &xprt->task_cleanup); queue_work(xprtiod_workqueue, &xprt->task_cleanup);
xprt_wake_pending_tasks(xprt, -EAGAIN); xprt_wake_pending_tasks(xprt, -EAGAIN);
out: out:
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
...@@ -689,7 +691,7 @@ xprt_init_autodisconnect(unsigned long data) ...@@ -689,7 +691,7 @@ xprt_init_autodisconnect(unsigned long data)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
goto out_abort; goto out_abort;
spin_unlock(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
queue_work(rpciod_workqueue, &xprt->task_cleanup); queue_work(xprtiod_workqueue, &xprt->task_cleanup);
return; return;
out_abort: out_abort:
spin_unlock(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
......
...@@ -271,14 +271,12 @@ struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, ...@@ -271,14 +271,12 @@ struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
xprt_switch_find_xprt_t find_next) xprt_switch_find_xprt_t find_next)
{ {
struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
struct list_head *head;
if (xps == NULL) if (xps == NULL)
return NULL; return NULL;
head = &xps->xps_xprt_list; return xprt_switch_set_next_cursor(&xps->xps_xprt_list,
if (xps->xps_nxprts < 2) &xpi->xpi_cursor,
return xprt_switch_find_first_entry(head); find_next);
return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next);
} }
static static
......
...@@ -642,6 +642,7 @@ static int xs_tcp_send_request(struct rpc_task *task) ...@@ -642,6 +642,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *xdr = &req->rq_snd_buf; struct xdr_buf *xdr = &req->rq_snd_buf;
bool zerocopy = true; bool zerocopy = true;
bool vm_wait = false;
int status; int status;
int sent; int sent;
...@@ -677,15 +678,33 @@ static int xs_tcp_send_request(struct rpc_task *task) ...@@ -677,15 +678,33 @@ static int xs_tcp_send_request(struct rpc_task *task)
return 0; return 0;
} }
WARN_ON_ONCE(sent == 0 && status == 0);
if (status == -EAGAIN ) {
/*
* Return EAGAIN if we're sure we're hitting the
* socket send buffer limits.
*/
if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
break;
/*
* Did we hit a memory allocation failure?
*/
if (sent == 0) {
status = -ENOBUFS;
if (vm_wait)
break;
/* Retry, knowing now that we're below the
* socket send buffer limit
*/
vm_wait = true;
}
continue;
}
if (status < 0) if (status < 0)
break; break;
if (sent == 0) { vm_wait = false;
status = -EAGAIN;
break;
}
} }
if (status == -EAGAIN && sk_stream_is_writeable(transport->inet))
status = -ENOBUFS;
switch (status) { switch (status) {
case -ENOTSOCK: case -ENOTSOCK:
...@@ -755,11 +774,19 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s ...@@ -755,11 +774,19 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s
sk->sk_error_report = transport->old_error_report; sk->sk_error_report = transport->old_error_report;
} }
static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
}
static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
{ {
smp_mb__before_atomic(); smp_mb__before_atomic();
clear_bit(XPRT_CLOSE_WAIT, &xprt->state); clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
clear_bit(XPRT_CLOSING, &xprt->state); clear_bit(XPRT_CLOSING, &xprt->state);
xs_sock_reset_state_flags(xprt);
smp_mb__after_atomic(); smp_mb__after_atomic();
} }
...@@ -962,10 +989,13 @@ static void xs_local_data_receive(struct sock_xprt *transport) ...@@ -962,10 +989,13 @@ static void xs_local_data_receive(struct sock_xprt *transport)
goto out; goto out;
for (;;) { for (;;) {
skb = skb_recv_datagram(sk, 0, 1, &err); skb = skb_recv_datagram(sk, 0, 1, &err);
if (skb == NULL) if (skb != NULL) {
xs_local_data_read_skb(&transport->xprt, sk, skb);
skb_free_datagram(sk, skb);
continue;
}
if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
break; break;
xs_local_data_read_skb(&transport->xprt, sk, skb);
skb_free_datagram(sk, skb);
} }
out: out:
mutex_unlock(&transport->recv_mutex); mutex_unlock(&transport->recv_mutex);
...@@ -1043,10 +1073,13 @@ static void xs_udp_data_receive(struct sock_xprt *transport) ...@@ -1043,10 +1073,13 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
goto out; goto out;
for (;;) { for (;;) {
skb = skb_recv_datagram(sk, 0, 1, &err); skb = skb_recv_datagram(sk, 0, 1, &err);
if (skb == NULL) if (skb != NULL) {
xs_udp_data_read_skb(&transport->xprt, sk, skb);
skb_free_datagram(sk, skb);
continue;
}
if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
break; break;
xs_udp_data_read_skb(&transport->xprt, sk, skb);
skb_free_datagram(sk, skb);
} }
out: out:
mutex_unlock(&transport->recv_mutex); mutex_unlock(&transport->recv_mutex);
...@@ -1074,7 +1107,14 @@ static void xs_data_ready(struct sock *sk) ...@@ -1074,7 +1107,14 @@ static void xs_data_ready(struct sock *sk)
if (xprt != NULL) { if (xprt != NULL) {
struct sock_xprt *transport = container_of(xprt, struct sock_xprt *transport = container_of(xprt,
struct sock_xprt, xprt); struct sock_xprt, xprt);
queue_work(rpciod_workqueue, &transport->recv_worker); transport->old_data_ready(sk);
/* Any data means we had a useful conversation, so
* then we don't need to delay the next reconnect
*/
if (xprt->reestablish_timeout)
xprt->reestablish_timeout = 0;
if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
queue_work(xprtiod_workqueue, &transport->recv_worker);
} }
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
} }
...@@ -1474,10 +1514,15 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) ...@@ -1474,10 +1514,15 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
for (;;) { for (;;) {
lock_sock(sk); lock_sock(sk);
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
release_sock(sk); if (read <= 0) {
if (read <= 0) clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
break; release_sock(sk);
total += read; if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
break;
} else {
release_sock(sk);
total += read;
}
rd_desc.count = 65536; rd_desc.count = 65536;
} }
out: out:
...@@ -1492,34 +1537,6 @@ static void xs_tcp_data_receive_workfn(struct work_struct *work) ...@@ -1492,34 +1537,6 @@ static void xs_tcp_data_receive_workfn(struct work_struct *work)
xs_tcp_data_receive(transport); xs_tcp_data_receive(transport);
} }
/**
* xs_tcp_data_ready - "data ready" callback for TCP sockets
* @sk: socket with data to read
*
*/
static void xs_tcp_data_ready(struct sock *sk)
{
struct sock_xprt *transport;
struct rpc_xprt *xprt;
dprintk("RPC: xs_tcp_data_ready...\n");
read_lock_bh(&sk->sk_callback_lock);
if (!(xprt = xprt_from_sock(sk)))
goto out;
transport = container_of(xprt, struct sock_xprt, xprt);
/* Any data means we had a useful conversation, so
* the we don't need to delay the next reconnect
*/
if (xprt->reestablish_timeout)
xprt->reestablish_timeout = 0;
queue_work(rpciod_workqueue, &transport->recv_worker);
out:
read_unlock_bh(&sk->sk_callback_lock);
}
/** /**
* xs_tcp_state_change - callback to handle TCP socket state changes * xs_tcp_state_change - callback to handle TCP socket state changes
* @sk: socket whose state has changed * @sk: socket whose state has changed
...@@ -2241,7 +2258,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) ...@@ -2241,7 +2258,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
xs_save_old_callbacks(transport, sk); xs_save_old_callbacks(transport, sk);
sk->sk_user_data = xprt; sk->sk_user_data = xprt;
sk->sk_data_ready = xs_tcp_data_ready; sk->sk_data_ready = xs_data_ready;
sk->sk_state_change = xs_tcp_state_change; sk->sk_state_change = xs_tcp_state_change;
sk->sk_write_space = xs_tcp_write_space; sk->sk_write_space = xs_tcp_write_space;
sock_set_flag(sk, SOCK_FASYNC); sock_set_flag(sk, SOCK_FASYNC);
...@@ -2380,7 +2397,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -2380,7 +2397,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
/* Start by resetting any existing state */ /* Start by resetting any existing state */
xs_reset_transport(transport); xs_reset_transport(transport);
queue_delayed_work(rpciod_workqueue, queue_delayed_work(xprtiod_workqueue,
&transport->connect_worker, &transport->connect_worker,
xprt->reestablish_timeout); xprt->reestablish_timeout);
xprt->reestablish_timeout <<= 1; xprt->reestablish_timeout <<= 1;
...@@ -2390,7 +2407,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -2390,7 +2407,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
} else { } else {
dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
queue_delayed_work(rpciod_workqueue, queue_delayed_work(xprtiod_workqueue,
&transport->connect_worker, 0); &transport->connect_worker, 0);
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment