Commit d9ba131d authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Support dynamic slot allocation for TCP connections

Allow the number of available slots to grow with the TCP window size.
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 21de0a95
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#define RPC_MIN_SLOT_TABLE (2U) #define RPC_MIN_SLOT_TABLE (2U)
#define RPC_DEF_SLOT_TABLE (16U) #define RPC_DEF_SLOT_TABLE (16U)
#define RPC_MAX_SLOT_TABLE (128U) #define RPC_MAX_SLOT_TABLE (128U)
#define RPC_MAX_SLOT_TABLE_LIMIT (65536U)
/* /*
* This describes a timeout strategy * This describes a timeout strategy
...@@ -168,7 +169,9 @@ struct rpc_xprt { ...@@ -168,7 +169,9 @@ struct rpc_xprt {
struct rpc_wait_queue pending; /* requests in flight */ struct rpc_wait_queue pending; /* requests in flight */
struct rpc_wait_queue backlog; /* waiting for slot */ struct rpc_wait_queue backlog; /* waiting for slot */
struct list_head free; /* free slots */ struct list_head free; /* free slots */
unsigned int max_reqs; /* total slots */ unsigned int max_reqs; /* max number of slots */
unsigned int min_reqs; /* min number of slots */
atomic_t num_reqs; /* total slots */
unsigned long state; /* transport state */ unsigned long state; /* transport state */
unsigned char shutdown : 1, /* being shut down */ unsigned char shutdown : 1, /* being shut down */
resvport : 1; /* use a reserved port */ resvport : 1; /* use a reserved port */
...@@ -281,7 +284,9 @@ void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task); ...@@ -281,7 +284,9 @@ void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_release(struct rpc_task *task); void xprt_release(struct rpc_task *task);
struct rpc_xprt * xprt_get(struct rpc_xprt *xprt); struct rpc_xprt * xprt_get(struct rpc_xprt *xprt);
void xprt_put(struct rpc_xprt *xprt); void xprt_put(struct rpc_xprt *xprt);
struct rpc_xprt * xprt_alloc(struct net *net, int size, int max_req); struct rpc_xprt * xprt_alloc(struct net *net, size_t size,
unsigned int num_prealloc,
unsigned int max_req);
void xprt_free(struct rpc_xprt *); void xprt_free(struct rpc_xprt *);
static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p) static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
......
...@@ -935,25 +935,66 @@ void xprt_transmit(struct rpc_task *task) ...@@ -935,25 +935,66 @@ void xprt_transmit(struct rpc_task *task)
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
} }
static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
{
struct rpc_rqst *req = ERR_PTR(-EAGAIN);
if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
goto out;
req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
if (req != NULL)
goto out;
atomic_dec(&xprt->num_reqs);
req = ERR_PTR(-ENOMEM);
out:
return req;
}
static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
kfree(req);
return true;
}
return false;
}
static void xprt_alloc_slot(struct rpc_task *task) static void xprt_alloc_slot(struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req;
task->tk_status = 0;
if (!list_empty(&xprt->free)) { if (!list_empty(&xprt->free)) {
struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
list_del_init(&req->rq_list); list_del(&req->rq_list);
task->tk_rqstp = req; goto out_init_req;
xprt_request_init(task, xprt);
return;
} }
req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT);
if (!IS_ERR(req))
goto out_init_req;
switch (PTR_ERR(req)) {
case -ENOMEM:
rpc_delay(task, HZ >> 2);
dprintk("RPC: dynamic allocation of request slot "
"failed! Retrying\n");
break;
case -EAGAIN:
rpc_sleep_on(&xprt->backlog, task, NULL);
dprintk("RPC: waiting for request slot\n"); dprintk("RPC: waiting for request slot\n");
}
task->tk_status = -EAGAIN; task->tk_status = -EAGAIN;
rpc_sleep_on(&xprt->backlog, task, NULL); return;
out_init_req:
task->tk_status = 0;
task->tk_rqstp = req;
xprt_request_init(task, xprt);
} }
static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
{ {
if (xprt_dynamic_free_slot(xprt, req))
return;
memset(req, 0, sizeof(*req)); /* mark unused */ memset(req, 0, sizeof(*req)); /* mark unused */
spin_lock(&xprt->reserve_lock); spin_lock(&xprt->reserve_lock);
...@@ -972,7 +1013,9 @@ static void xprt_free_all_slots(struct rpc_xprt *xprt) ...@@ -972,7 +1013,9 @@ static void xprt_free_all_slots(struct rpc_xprt *xprt)
} }
} }
struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_prealloc) struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
unsigned int num_prealloc,
unsigned int max_alloc)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
struct rpc_rqst *req; struct rpc_rqst *req;
...@@ -992,7 +1035,12 @@ struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_prealloc) ...@@ -992,7 +1035,12 @@ struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_prealloc)
} }
if (i < num_prealloc) if (i < num_prealloc)
goto out_free; goto out_free;
if (max_alloc > num_prealloc)
xprt->max_reqs = max_alloc;
else
xprt->max_reqs = num_prealloc; xprt->max_reqs = num_prealloc;
xprt->min_reqs = num_prealloc;
atomic_set(&xprt->num_reqs, num_prealloc);
return xprt; return xprt;
...@@ -1036,7 +1084,6 @@ void xprt_reserve(struct rpc_task *task) ...@@ -1036,7 +1084,6 @@ void xprt_reserve(struct rpc_task *task)
if (!xprt_lock_write(xprt, task)) if (!xprt_lock_write(xprt, task))
return; return;
task->tk_status = -EIO;
spin_lock(&xprt->reserve_lock); spin_lock(&xprt->reserve_lock);
xprt_alloc_slot(task); xprt_alloc_slot(task);
spin_unlock(&xprt->reserve_lock); spin_unlock(&xprt->reserve_lock);
...@@ -1057,6 +1104,7 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) ...@@ -1057,6 +1104,7 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
INIT_LIST_HEAD(&req->rq_list);
req->rq_timeout = task->tk_client->cl_timeout->to_initval; req->rq_timeout = task->tk_client->cl_timeout->to_initval;
req->rq_task = task; req->rq_task = task;
req->rq_xprt = xprt; req->rq_xprt = xprt;
......
...@@ -283,6 +283,7 @@ xprt_setup_rdma(struct xprt_create *args) ...@@ -283,6 +283,7 @@ xprt_setup_rdma(struct xprt_create *args)
} }
xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
xprt_rdma_slot_table_entries,
xprt_rdma_slot_table_entries); xprt_rdma_slot_table_entries);
if (xprt == NULL) { if (xprt == NULL) {
dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n", dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n",
......
...@@ -54,7 +54,8 @@ static void xs_close(struct rpc_xprt *xprt); ...@@ -54,7 +54,8 @@ static void xs_close(struct rpc_xprt *xprt);
* xprtsock tunables * xprtsock tunables
*/ */
unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
...@@ -75,6 +76,7 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; ...@@ -75,6 +76,7 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT; static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT; static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
...@@ -103,6 +105,15 @@ static ctl_table xs_tunables_table[] = { ...@@ -103,6 +105,15 @@ static ctl_table xs_tunables_table[] = {
.extra1 = &min_slot_table_size, .extra1 = &min_slot_table_size,
.extra2 = &max_slot_table_size .extra2 = &max_slot_table_size
}, },
{
.procname = "tcp_max_slot_table_entries",
.data = &xprt_max_tcp_slot_table_entries,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_dointvec_minmax,
.extra1 = &min_slot_table_size,
.extra2 = &max_tcp_slot_table_limit
},
{ {
.procname = "min_resvport", .procname = "min_resvport",
.data = &xprt_min_resvport, .data = &xprt_min_resvport,
...@@ -2491,7 +2502,8 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap) ...@@ -2491,7 +2502,8 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap)
} }
static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
unsigned int slot_table_size) unsigned int slot_table_size,
unsigned int max_slot_table_size)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
struct sock_xprt *new; struct sock_xprt *new;
...@@ -2501,7 +2513,8 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, ...@@ -2501,7 +2513,8 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
return ERR_PTR(-EBADF); return ERR_PTR(-EBADF);
} }
xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size); xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
max_slot_table_size);
if (xprt == NULL) { if (xprt == NULL) {
dprintk("RPC: xs_setup_xprt: couldn't allocate " dprintk("RPC: xs_setup_xprt: couldn't allocate "
"rpc_xprt\n"); "rpc_xprt\n");
...@@ -2543,7 +2556,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) ...@@ -2543,7 +2556,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
struct rpc_xprt *ret; struct rpc_xprt *ret;
xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
xprt_max_tcp_slot_table_entries);
if (IS_ERR(xprt)) if (IS_ERR(xprt))
return xprt; return xprt;
transport = container_of(xprt, struct sock_xprt, xprt); transport = container_of(xprt, struct sock_xprt, xprt);
...@@ -2607,7 +2621,8 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) ...@@ -2607,7 +2621,8 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
struct sock_xprt *transport; struct sock_xprt *transport;
struct rpc_xprt *ret; struct rpc_xprt *ret;
xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries); xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
xprt_udp_slot_table_entries);
if (IS_ERR(xprt)) if (IS_ERR(xprt))
return xprt; return xprt;
transport = container_of(xprt, struct sock_xprt, xprt); transport = container_of(xprt, struct sock_xprt, xprt);
...@@ -2683,7 +2698,8 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) ...@@ -2683,7 +2698,8 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
struct sock_xprt *transport; struct sock_xprt *transport;
struct rpc_xprt *ret; struct rpc_xprt *ret;
xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
xprt_max_tcp_slot_table_entries);
if (IS_ERR(xprt)) if (IS_ERR(xprt))
return xprt; return xprt;
transport = container_of(xprt, struct sock_xprt, xprt); transport = container_of(xprt, struct sock_xprt, xprt);
...@@ -2762,7 +2778,8 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) ...@@ -2762,7 +2778,8 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
*/ */
return args->bc_xprt->xpt_bc_xprt; return args->bc_xprt->xpt_bc_xprt;
} }
xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
xprt_tcp_slot_table_entries);
if (IS_ERR(xprt)) if (IS_ERR(xprt))
return xprt; return xprt;
transport = container_of(xprt, struct sock_xprt, xprt); transport = container_of(xprt, struct sock_xprt, xprt);
...@@ -2949,8 +2966,26 @@ static struct kernel_param_ops param_ops_slot_table_size = { ...@@ -2949,8 +2966,26 @@ static struct kernel_param_ops param_ops_slot_table_size = {
#define param_check_slot_table_size(name, p) \ #define param_check_slot_table_size(name, p) \
__param_check(name, p, unsigned int); __param_check(name, p, unsigned int);
static int param_set_max_slot_table_size(const char *val,
const struct kernel_param *kp)
{
return param_set_uint_minmax(val, kp,
RPC_MIN_SLOT_TABLE,
RPC_MAX_SLOT_TABLE_LIMIT);
}
static struct kernel_param_ops param_ops_max_slot_table_size = {
.set = param_set_max_slot_table_size,
.get = param_get_uint,
};
#define param_check_max_slot_table_size(name, p) \
__param_check(name, p, unsigned int);
module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries, module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
slot_table_size, 0644); slot_table_size, 0644);
module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
max_slot_table_size, 0644);
module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries, module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
slot_table_size, 0644); slot_table_size, 0644);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment