Commit 4bebdd7a authored by Santosh Shilimkar's avatar Santosh Shilimkar

RDS: defer the over_batch work to send worker

Current process gives up if its send work over the batch limit.
The work queue will get  kicked to finish off any other requests.
This fixes remainder condition from commit 443be0e5 ("RDS: make
sure not to loop forever inside rds_send_xmit").

The restart condition is only for the case where we reached to
over_batch code for some other reason so just retrying again
before giving up.

While at it, make sure we use already available 'send_batch_count'
parameter instead of magic value. The batch count threshold value
of 1024 came via commit 443be0e5 ("RDS: make sure not to loop
forever inside rds_send_xmit"). The idea is to process as big a
batch as we can but at the same time we don't hold other waiting
processes for send. Hence back-off after the send_batch_count
limit (1024) to avoid soft-lock ups.
Signed-off-by: default avatarSantosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: default avatarSantosh Shilimkar <santosh.shilimkar@oracle.com>
parent 9b9acde7
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#include <linux/list.h> #include <linux/list.h>
#include <linux/ratelimit.h> #include <linux/ratelimit.h>
#include <linux/export.h> #include <linux/export.h>
#include <linux/sizes.h>
#include "rds.h" #include "rds.h"
...@@ -51,7 +52,7 @@ ...@@ -51,7 +52,7 @@
* it to 0 will restore the old behavior (where we looped until we had * it to 0 will restore the old behavior (where we looped until we had
* drained the queue). * drained the queue).
*/ */
static int send_batch_count = 64; static int send_batch_count = SZ_1K;
module_param(send_batch_count, int, 0444); module_param(send_batch_count, int, 0444);
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
...@@ -223,7 +224,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -223,7 +224,7 @@ int rds_send_xmit(struct rds_connection *conn)
* through a lot of messages, lets back off and see * through a lot of messages, lets back off and see
* if anyone else jumps in * if anyone else jumps in
*/ */
if (batch_count >= 1024) if (batch_count >= send_batch_count)
goto over_batch; goto over_batch;
spin_lock_irqsave(&conn->c_lock, flags); spin_lock_irqsave(&conn->c_lock, flags);
...@@ -423,7 +424,9 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -423,7 +424,9 @@ int rds_send_xmit(struct rds_connection *conn)
!list_empty(&conn->c_send_queue)) && !list_empty(&conn->c_send_queue)) &&
send_gen == conn->c_send_gen) { send_gen == conn->c_send_gen) {
rds_stats_inc(s_send_lock_queue_raced); rds_stats_inc(s_send_lock_queue_raced);
goto restart; if (batch_count < send_batch_count)
goto restart;
queue_delayed_work(rds_wq, &conn->c_send_w, 1);
} }
} }
out: out:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment