Commit ecf78286 authored by Trond Myklebust's avatar Trond Myklebust

Merge branch 'multipath'

* multipath:
  NFS add callback_ops to nfs4_proc_bind_conn_to_session_callback
  pnfs/NFSv4.1: Add multipath capabilities to pNFS flexfiles servers over NFSv3
  SUNRPC: Allow addition of new transports to a struct rpc_clnt
  NFSv4.1: nfs4_proc_bind_conn_to_session must iterate over all connections
  SUNRPC: Make NFS swap work with multipath
  SUNRPC: Add a helper to apply a function to all the rpc_clnt's transports
  SUNRPC: Allow caller to specify the transport to use
  SUNRPC: Use the multipath iterator to assign a transport to each task
  SUNRPC: Make rpc_clnt store the multipath iterators
  SUNRPC: Add a structure to track multiple transports
  SUNRPC: Make freeing of struct xprt rcu-safe
  SUNRPC: Uninline xprt_get(); It isn't performance critical.
  SUNRPC: Reorder rpc_task to put waitqueue related info in same cachelines
  SUNRPC: Remove unused function rpc_task_reset_client
parents cc1f9000 02a95dee
......@@ -6782,13 +6782,26 @@ nfs41_same_server_scope(struct nfs41_server_scope *a,
return false;
}
static void
nfs4_bind_one_conn_to_session_done(struct rpc_task *task, void *calldata)
{
}
static const struct rpc_call_ops nfs4_bind_one_conn_to_session_ops = {
.rpc_call_done = &nfs4_bind_one_conn_to_session_done,
};
/*
* nfs4_proc_bind_conn_to_session()
* nfs4_proc_bind_one_conn_to_session()
*
* The 4.1 client currently uses the same TCP connection for the
* fore and backchannel.
*/
int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred)
static
int nfs4_proc_bind_one_conn_to_session(struct rpc_clnt *clnt,
struct rpc_xprt *xprt,
struct nfs_client *clp,
struct rpc_cred *cred)
{
int status;
struct nfs41_bind_conn_to_session_args args = {
......@@ -6803,6 +6816,14 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred
.rpc_resp = &res,
.rpc_cred = cred,
};
struct rpc_task_setup task_setup_data = {
.rpc_client = clnt,
.rpc_xprt = xprt,
.callback_ops = &nfs4_bind_one_conn_to_session_ops,
.rpc_message = &msg,
.flags = RPC_TASK_TIMEOUT,
};
struct rpc_task *task;
dprintk("--> %s\n", __func__);
......@@ -6810,7 +6831,16 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred
if (!(clp->cl_session->flags & SESSION4_BACK_CHAN))
args.dir = NFS4_CDFC4_FORE;
status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
/* Do not set the backchannel flag unless this is clnt->cl_xprt */
if (xprt != rcu_access_pointer(clnt->cl_xprt))
args.dir = NFS4_CDFC4_FORE;
task = rpc_run_task(&task_setup_data);
if (!IS_ERR(task)) {
status = task->tk_status;
rpc_put_task(task);
} else
status = PTR_ERR(task);
trace_nfs4_bind_conn_to_session(clp, status);
if (status == 0) {
if (memcmp(res.sessionid.data,
......@@ -6837,6 +6867,31 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred
return status;
}
struct rpc_bind_conn_calldata {
struct nfs_client *clp;
struct rpc_cred *cred;
};
static int
nfs4_proc_bind_conn_to_session_callback(struct rpc_clnt *clnt,
struct rpc_xprt *xprt,
void *calldata)
{
struct rpc_bind_conn_calldata *p = calldata;
return nfs4_proc_bind_one_conn_to_session(clnt, xprt, p->clp, p->cred);
}
int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred)
{
struct rpc_bind_conn_calldata data = {
.clp = clp,
.cred = cred,
};
return rpc_clnt_iterate_for_each_xprt(clp->cl_rpcclient,
nfs4_proc_bind_conn_to_session_callback, &data);
}
/*
* Minimum set of SP4_MACH_CRED operations from RFC 5661 in the enforce map
* and operations we'd like to see to enable certain features in the allow map
......
......@@ -606,12 +606,22 @@ static int _nfs4_pnfs_v3_ds_connect(struct nfs_server *mds_srv,
dprintk("%s: DS %s: trying address %s\n",
__func__, ds->ds_remotestr, da->da_remotestr);
clp = get_v3_ds_connect(mds_srv->nfs_client,
if (!IS_ERR(clp)) {
struct xprt_create xprt_args = {
.ident = XPRT_TRANSPORT_TCP,
.net = clp->cl_net,
.dstaddr = (struct sockaddr *)&da->da_addr,
.addrlen = da->da_addrlen,
.servername = clp->cl_hostname,
};
/* Add this address as an alias */
rpc_clnt_add_xprt(clp->cl_rpcclient, &xprt_args,
rpc_clnt_test_and_add_xprt, NULL);
} else
clp = get_v3_ds_connect(mds_srv->nfs_client,
(struct sockaddr *)&da->da_addr,
da->da_addrlen, IPPROTO_TCP,
timeo, retrans, au_flavor);
if (!IS_ERR(clp))
break;
}
if (IS_ERR(clp)) {
......
......@@ -25,6 +25,7 @@
#include <asm/signal.h>
#include <linux/path.h>
#include <net/ipv6.h>
#include <linux/sunrpc/xprtmultipath.h>
struct rpc_inode;
......@@ -67,6 +68,7 @@ struct rpc_clnt {
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
struct dentry *cl_debugfs; /* debugfs directory */
#endif
struct rpc_xprt_iter cl_xpi;
};
/*
......@@ -139,7 +141,6 @@ struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
struct rpc_xprt *xprt);
struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *,
const struct rpc_program *, u32);
void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt);
struct rpc_clnt *rpc_clone_client(struct rpc_clnt *);
struct rpc_clnt *rpc_clone_client_set_auth(struct rpc_clnt *,
rpc_authflavor_t);
......@@ -181,6 +182,21 @@ size_t rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t);
const char *rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t);
int rpc_localaddr(struct rpc_clnt *, struct sockaddr *, size_t);
int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
void *data);
int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt,
void *dummy);
int rpc_clnt_add_xprt(struct rpc_clnt *, struct xprt_create *,
int (*setup)(struct rpc_clnt *,
struct rpc_xprt_switch *,
struct rpc_xprt *,
void *),
void *data);
const char *rpc_proc_name(const struct rpc_task *task);
#endif /* __KERNEL__ */
#endif /* _LINUX_SUNRPC_CLNT_H */
......@@ -42,40 +42,43 @@ struct rpc_wait {
*/
struct rpc_task {
atomic_t tk_count; /* Reference count */
int tk_status; /* result of last operation */
struct list_head tk_task; /* global list of tasks */
struct rpc_clnt * tk_client; /* RPC client */
struct rpc_rqst * tk_rqstp; /* RPC request */
/*
* RPC call state
*/
struct rpc_message tk_msg; /* RPC call info */
/*
* callback to be executed after waking up
* action next procedure for async tasks
* tk_ops caller callbacks
*/
void (*tk_callback)(struct rpc_task *);
void (*tk_action)(struct rpc_task *);
const struct rpc_call_ops *tk_ops;
void * tk_calldata;
unsigned long tk_timeout; /* timeout for rpc_sleep() */
unsigned long tk_runstate; /* Task run status */
struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could
* be any workqueue
*/
struct rpc_wait_queue *tk_waitqueue; /* RPC wait queue we're on */
union {
struct work_struct tk_work; /* Async task work queue */
struct rpc_wait tk_wait; /* RPC wait */
} u;
/*
* RPC call state
*/
struct rpc_message tk_msg; /* RPC call info */
void * tk_calldata; /* Caller private data */
const struct rpc_call_ops *tk_ops; /* Caller callbacks */
struct rpc_clnt * tk_client; /* RPC client */
struct rpc_xprt * tk_xprt; /* Transport */
struct rpc_rqst * tk_rqstp; /* RPC request */
struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could
* be any workqueue
*/
ktime_t tk_start; /* RPC task init timestamp */
pid_t tk_owner; /* Process id for batching tasks */
int tk_status; /* result of last operation */
unsigned short tk_flags; /* misc flags */
unsigned short tk_timeouts; /* maj timeouts */
......@@ -100,6 +103,7 @@ struct rpc_call_ops {
struct rpc_task_setup {
struct rpc_task *task;
struct rpc_clnt *rpc_client;
struct rpc_xprt *rpc_xprt;
const struct rpc_message *rpc_message;
const struct rpc_call_ops *callback_ops;
void *callback_data;
......
......@@ -13,6 +13,7 @@
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/ktime.h>
#include <linux/kref.h>
#include <linux/sunrpc/sched.h>
#include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/msg_prot.h>
......@@ -166,7 +167,7 @@ enum xprt_transports {
};
struct rpc_xprt {
atomic_t count; /* Reference count */
struct kref kref; /* Reference count */
struct rpc_xprt_ops * ops; /* transport methods */
const struct rpc_timeout *timeout; /* timeout parms */
......@@ -196,6 +197,11 @@ struct rpc_xprt {
transport */
unsigned int bind_index; /* bind function index */
/*
* Multipath
*/
struct list_head xprt_switch;
/*
* Connection of transports
*/
......@@ -256,6 +262,7 @@ struct rpc_xprt {
struct dentry *debugfs; /* debugfs directory */
atomic_t inject_disconnect;
#endif
struct rcu_head rcu;
};
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
......@@ -318,24 +325,13 @@ int xprt_adjust_timeout(struct rpc_rqst *req);
void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_release(struct rpc_task *task);
struct rpc_xprt * xprt_get(struct rpc_xprt *xprt);
void xprt_put(struct rpc_xprt *xprt);
struct rpc_xprt * xprt_alloc(struct net *net, size_t size,
unsigned int num_prealloc,
unsigned int max_req);
void xprt_free(struct rpc_xprt *);
/**
* xprt_get - return a reference to an RPC transport.
* @xprt: pointer to the transport
*
*/
static inline struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
{
if (atomic_inc_not_zero(&xprt->count))
return xprt;
return NULL;
}
static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
{
return p + xprt->tsh_size;
......
/*
* RPC client multipathing definitions
*
* Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
*
* Trond Myklebust <trond.myklebust@primarydata.com>
*/
#ifndef _NET_SUNRPC_XPRTMULTIPATH_H
#define _NET_SUNRPC_XPRTMULTIPATH_H
struct rpc_xprt_iter_ops;
struct rpc_xprt_switch {
spinlock_t xps_lock;
struct kref xps_kref;
unsigned int xps_nxprts;
struct list_head xps_xprt_list;
struct net * xps_net;
const struct rpc_xprt_iter_ops *xps_iter_ops;
struct rcu_head xps_rcu;
};
struct rpc_xprt_iter {
struct rpc_xprt_switch __rcu *xpi_xpswitch;
struct rpc_xprt * xpi_cursor;
const struct rpc_xprt_iter_ops *xpi_ops;
};
struct rpc_xprt_iter_ops {
void (*xpi_rewind)(struct rpc_xprt_iter *);
struct rpc_xprt *(*xpi_xprt)(struct rpc_xprt_iter *);
struct rpc_xprt *(*xpi_next)(struct rpc_xprt_iter *);
};
extern struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
gfp_t gfp_flags);
extern struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps);
extern void xprt_switch_put(struct rpc_xprt_switch *xps);
extern void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps);
extern void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt);
extern void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt);
extern void xprt_iter_init(struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *xps);
extern void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *xps);
extern void xprt_iter_destroy(struct rpc_xprt_iter *xpi);
extern struct rpc_xprt_switch *xprt_iter_xchg_switch(
struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *newswitch);
extern struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi);
extern struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi);
extern struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi);
#endif
......@@ -12,7 +12,8 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \
svc.o svcsock.o svcauth.o svcauth_unix.o \
addr.o rpcb_clnt.o timer.o xdr.o \
sunrpc_syms.o cache.o rpc_pipe.o \
svc_xprt.o
svc_xprt.o \
xprtmultipath.o
sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o
sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o
sunrpc-$(CONFIG_PROC_FS) += stats.o
......
......@@ -1181,12 +1181,12 @@ static struct rpc_auth *
gss_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
{
struct gss_auth *gss_auth;
struct rpc_xprt *xprt = rcu_access_pointer(clnt->cl_xprt);
struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
while (clnt != clnt->cl_parent) {
struct rpc_clnt *parent = clnt->cl_parent;
/* Find the original parent for this transport */
if (rcu_access_pointer(parent->cl_xprt) != xprt)
if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
break;
clnt = parent;
}
......
......@@ -354,6 +354,7 @@ static void rpc_free_clid(struct rpc_clnt *clnt)
}
static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt,
struct rpc_clnt *parent)
{
......@@ -411,6 +412,8 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
}
rpc_clnt_set_transport(clnt, xprt, timeout);
xprt_iter_init(&clnt->cl_xpi, xps);
xprt_switch_put(xps);
clnt->cl_rtt = &clnt->cl_rtt_default;
rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
......@@ -438,6 +441,7 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
out_err:
rpciod_down();
out_no_rpciod:
xprt_switch_put(xps);
xprt_put(xprt);
return ERR_PTR(err);
}
......@@ -446,8 +450,13 @@ struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
struct rpc_xprt *xprt)
{
struct rpc_clnt *clnt = NULL;
struct rpc_xprt_switch *xps;
clnt = rpc_new_client(args, xprt, NULL);
xps = xprt_switch_alloc(xprt, GFP_KERNEL);
if (xps == NULL)
return ERR_PTR(-ENOMEM);
clnt = rpc_new_client(args, xps, xprt, NULL);
if (IS_ERR(clnt))
return clnt;
......@@ -564,6 +573,7 @@ EXPORT_SYMBOL_GPL(rpc_create);
static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
struct rpc_clnt *clnt)
{
struct rpc_xprt_switch *xps;
struct rpc_xprt *xprt;
struct rpc_clnt *new;
int err;
......@@ -571,13 +581,17 @@ static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
err = -ENOMEM;
rcu_read_lock();
xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
rcu_read_unlock();
if (xprt == NULL)
if (xprt == NULL || xps == NULL) {
xprt_put(xprt);
xprt_switch_put(xps);
goto out_err;
}
args->servername = xprt->servername;
args->nodename = clnt->cl_nodename;
new = rpc_new_client(args, xprt, clnt);
new = rpc_new_client(args, xps, xprt, clnt);
if (IS_ERR(new)) {
err = PTR_ERR(new);
goto out_err;
......@@ -657,6 +671,7 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt,
{
const struct rpc_timeout *old_timeo;
rpc_authflavor_t pseudoflavor;
struct rpc_xprt_switch *xps, *oldxps;
struct rpc_xprt *xprt, *old;
struct rpc_clnt *parent;
int err;
......@@ -668,10 +683,17 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt,
return PTR_ERR(xprt);
}
xps = xprt_switch_alloc(xprt, GFP_KERNEL);
if (xps == NULL) {
xprt_put(xprt);
return -ENOMEM;
}
pseudoflavor = clnt->cl_auth->au_flavor;
old_timeo = clnt->cl_timeout;
old = rpc_clnt_set_transport(clnt, xprt, timeout);
oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
rpc_unregister_client(clnt);
__rpc_clnt_remove_pipedir(clnt);
......@@ -697,20 +719,74 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt,
synchronize_rcu();
if (parent != clnt)
rpc_release_client(parent);
xprt_switch_put(oldxps);
xprt_put(old);
dprintk("RPC: replaced xprt for clnt %p\n", clnt);
return 0;
out_revert:
xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
rpc_clnt_set_transport(clnt, old, old_timeo);
clnt->cl_parent = parent;
rpc_client_register(clnt, pseudoflavor, NULL);
xprt_switch_put(xps);
xprt_put(xprt);
dprintk("RPC: failed to switch xprt for clnt %p\n", clnt);
return err;
}
EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
static
int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
{
struct rpc_xprt_switch *xps;
rcu_read_lock();
xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
rcu_read_unlock();
if (xps == NULL)
return -EAGAIN;
xprt_iter_init_listall(xpi, xps);
xprt_switch_put(xps);
return 0;
}
/**
* rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
* @clnt: pointer to client
* @fn: function to apply
* @data: void pointer to function data
*
* Iterates through the list of RPC transports currently attached to the
* client and applies the function fn(clnt, xprt, data).
*
* On error, the iteration stops, and the function returns the error value.
*/
int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
void *data)
{
struct rpc_xprt_iter xpi;
int ret;
ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
if (ret)
return ret;
for (;;) {
struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
if (!xprt)
break;
ret = fn(clnt, xprt, data);
xprt_put(xprt);
if (ret < 0)
break;
}
xprt_iter_destroy(&xpi);
return ret;
}
EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
/*
* Kill all tasks for the given client.
* XXX: kill their descendants as well?
......@@ -783,6 +859,7 @@ rpc_free_client(struct rpc_clnt *clnt)
rpc_free_iostats(clnt->cl_metrics);
clnt->cl_metrics = NULL;
xprt_put(rcu_dereference_raw(clnt->cl_xprt));
xprt_iter_destroy(&clnt->cl_xpi);
rpciod_down();
rpc_free_clid(clnt);
kfree(clnt);
......@@ -868,6 +945,7 @@ EXPORT_SYMBOL_GPL(rpc_bind_new_program);
void rpc_task_release_client(struct rpc_task *task)
{
struct rpc_clnt *clnt = task->tk_client;
struct rpc_xprt *xprt = task->tk_xprt;
if (clnt != NULL) {
/* Remove from client task list */
......@@ -878,13 +956,22 @@ void rpc_task_release_client(struct rpc_task *task)
rpc_release_client(clnt);
}
if (xprt != NULL) {
task->tk_xprt = NULL;
xprt_put(xprt);
}
}
static
void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
{
if (clnt != NULL) {
rpc_task_release_client(task);
if (task->tk_xprt == NULL)
task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
task->tk_client = clnt;
atomic_inc(&clnt->cl_count);
if (clnt->cl_softrtry)
......@@ -900,14 +987,6 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
}
}
void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
{
rpc_task_release_client(task);
rpc_task_set_client(task, clnt);
}
EXPORT_SYMBOL_GPL(rpc_task_reset_client);
static void
rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
{
......@@ -2104,11 +2183,9 @@ call_timeout(struct rpc_task *task)
}
if (RPC_IS_SOFT(task)) {
if (clnt->cl_chatty) {
rcu_read_lock();
printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
clnt->cl_program->name,
rcu_dereference(clnt->cl_xprt)->servername);
rcu_read_unlock();
task->tk_xprt->servername);
}
if (task->tk_flags & RPC_TASK_TIMEOUT)
rpc_exit(task, -ETIMEDOUT);
......@@ -2120,11 +2197,9 @@ call_timeout(struct rpc_task *task)
if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
task->tk_flags |= RPC_CALL_MAJORSEEN;
if (clnt->cl_chatty) {
rcu_read_lock();
printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
clnt->cl_program->name,
rcu_dereference(clnt->cl_xprt)->servername);
rcu_read_unlock();
task->tk_xprt->servername);
}
}
rpc_force_rebind(clnt);
......@@ -2154,11 +2229,9 @@ call_decode(struct rpc_task *task)
if (task->tk_flags & RPC_CALL_MAJORSEEN) {
if (clnt->cl_chatty) {
rcu_read_lock();
printk(KERN_NOTICE "%s: server %s OK\n",
clnt->cl_program->name,
rcu_dereference(clnt->cl_xprt)->servername);
rcu_read_unlock();
task->tk_xprt->servername);
}
task->tk_flags &= ~RPC_CALL_MAJORSEEN;
}
......@@ -2312,11 +2385,9 @@ rpc_verify_header(struct rpc_task *task)
task->tk_action = call_bind;
goto out_retry;
case RPC_AUTH_TOOWEAK:
rcu_read_lock();
printk(KERN_NOTICE "RPC: server %s requires stronger "
"authentication.\n",
rcu_dereference(clnt->cl_xprt)->servername);
rcu_read_unlock();
task->tk_xprt->servername);
break;
default:
dprintk("RPC: %5u %s: unknown auth error: %x\n",
......@@ -2341,27 +2412,27 @@ rpc_verify_header(struct rpc_task *task)
case RPC_SUCCESS:
return p;
case RPC_PROG_UNAVAIL:
dprintk_rcu("RPC: %5u %s: program %u is unsupported "
dprintk("RPC: %5u %s: program %u is unsupported "
"by server %s\n", task->tk_pid, __func__,
(unsigned int)clnt->cl_prog,
rcu_dereference(clnt->cl_xprt)->servername);
task->tk_xprt->servername);
error = -EPFNOSUPPORT;
goto out_err;
case RPC_PROG_MISMATCH:
dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
dprintk("RPC: %5u %s: program %u, version %u unsupported "
"by server %s\n", task->tk_pid, __func__,
(unsigned int)clnt->cl_prog,
(unsigned int)clnt->cl_vers,
rcu_dereference(clnt->cl_xprt)->servername);
task->tk_xprt->servername);
error = -EPROTONOSUPPORT;
goto out_err;
case RPC_PROC_UNAVAIL:
dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
"version %u on server %s\n",
task->tk_pid, __func__,
rpc_proc_name(task),
clnt->cl_prog, clnt->cl_vers,
rcu_dereference(clnt->cl_xprt)->servername);
task->tk_xprt->servername);
error = -EOPNOTSUPP;
goto out_err;
case RPC_GARBAGE_ARGS:
......@@ -2421,7 +2492,10 @@ static int rpc_ping(struct rpc_clnt *clnt)
return err;
}
struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
static
struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
const struct rpc_call_ops *ops, void *data)
{
struct rpc_message msg = {
.rpc_proc = &rpcproc_null,
......@@ -2429,14 +2503,140 @@ struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int
};
struct rpc_task_setup task_setup_data = {
.rpc_client = clnt,
.rpc_xprt = xprt,
.rpc_message = &msg,
.callback_ops = &rpc_default_ops,
.callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
.callback_data = data,
.flags = flags,
};
return rpc_run_task(&task_setup_data);
}
struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
{
return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
}
EXPORT_SYMBOL_GPL(rpc_call_null);
struct rpc_cb_add_xprt_calldata {
struct rpc_xprt_switch *xps;
struct rpc_xprt *xprt;
};
static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
{
struct rpc_cb_add_xprt_calldata *data = calldata;
if (task->tk_status == 0)
rpc_xprt_switch_add_xprt(data->xps, data->xprt);
}
static void rpc_cb_add_xprt_release(void *calldata)
{
struct rpc_cb_add_xprt_calldata *data = calldata;
xprt_put(data->xprt);
xprt_switch_put(data->xps);
kfree(data);
}
const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
.rpc_call_done = rpc_cb_add_xprt_done,
.rpc_release = rpc_cb_add_xprt_release,
};
/**
* rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
* @clnt: pointer to struct rpc_clnt
* @xps: pointer to struct rpc_xprt_switch,
* @xprt: pointer struct rpc_xprt
* @dummy: unused
*/
int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
void *dummy)
{
struct rpc_cb_add_xprt_calldata *data;
struct rpc_cred *cred;
struct rpc_task *task;
data = kmalloc(sizeof(*data), GFP_NOFS);
if (!data)
return -ENOMEM;
data->xps = xprt_switch_get(xps);
data->xprt = xprt_get(xprt);
cred = authnull_ops.lookup_cred(NULL, NULL, 0);
task = rpc_call_null_helper(clnt, xprt, cred,
RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC,
&rpc_cb_add_xprt_call_ops, data);
put_rpccred(cred);
if (IS_ERR(task))
return PTR_ERR(task);
rpc_put_task(task);
return 1;
}
EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
/**
* rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
* @clnt: pointer to struct rpc_clnt
* @xprtargs: pointer to struct xprt_create
* @setup: callback to test and/or set up the connection
* @data: pointer to setup function data
*
* Creates a new transport using the parameters set in args and
* adds it to clnt.
* If ping is set, then test that connectivity succeeds before
* adding the new transport.
*
*/
int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
struct xprt_create *xprtargs,
int (*setup)(struct rpc_clnt *,
struct rpc_xprt_switch *,
struct rpc_xprt *,
void *),
void *data)
{
struct rpc_xprt_switch *xps;
struct rpc_xprt *xprt;
unsigned char resvport;
int ret = 0;
rcu_read_lock();
xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
xprt = xprt_iter_xprt(&clnt->cl_xpi);
if (xps == NULL || xprt == NULL) {
rcu_read_unlock();
return -EAGAIN;
}
resvport = xprt->resvport;
rcu_read_unlock();
xprt = xprt_create_transport(xprtargs);
if (IS_ERR(xprt)) {
ret = PTR_ERR(xprt);
goto out_put_switch;
}
xprt->resvport = resvport;
rpc_xprt_switch_set_roundrobin(xps);
if (setup) {
ret = setup(clnt, xps, xprt, data);
if (ret != 0)
goto out_put_xprt;
}
rpc_xprt_switch_add_xprt(xps, xprt);
out_put_xprt:
xprt_put(xprt);
out_put_switch:
xprt_switch_put(xps);
return ret;
}
EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
static void rpc_show_header(void)
{
......@@ -2483,57 +2683,39 @@ void rpc_show_tasks(struct net *net)
#endif
#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
static int
rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
struct rpc_xprt *xprt,
void *dummy)
{
return xprt_enable_swap(xprt);
}
int
rpc_clnt_swap_activate(struct rpc_clnt *clnt)
{
int ret = 0;
struct rpc_xprt *xprt;
if (atomic_inc_return(&clnt->cl_swapper) == 1) {
retry:
rcu_read_lock();
xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
rcu_read_unlock();
if (!xprt) {
/*
* If we didn't get a reference, then we likely are
* racing with a migration event. Wait for a grace
* period and try again.
*/
synchronize_rcu();
goto retry;
}
ret = xprt_enable_swap(xprt);
xprt_put(xprt);
}
return ret;
if (atomic_inc_return(&clnt->cl_swapper) == 1)
return rpc_clnt_iterate_for_each_xprt(clnt,
rpc_clnt_swap_activate_callback, NULL);
return 0;
}
EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
static int
rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
struct rpc_xprt *xprt,
void *dummy)
{
xprt_disable_swap(xprt);
return 0;
}
void
rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
{
struct rpc_xprt *xprt;
if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) {
retry:
rcu_read_lock();
xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
rcu_read_unlock();
if (!xprt) {
/*
* If we didn't get a reference, then we likely are
* racing with a migration event. Wait for a grace
* period and try again.
*/
synchronize_rcu();
goto retry;
}
xprt_disable_swap(xprt);
xprt_put(xprt);
}
if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
rpc_clnt_iterate_for_each_xprt(clnt,
rpc_clnt_swap_deactivate_callback, NULL);
}
EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
#endif /* CONFIG_SUNRPC_SWAP */
......@@ -648,10 +648,10 @@ static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbi
static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt)
{
struct rpc_clnt *parent = clnt->cl_parent;
struct rpc_xprt *xprt = rcu_dereference(clnt->cl_xprt);
struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
while (parent != clnt) {
if (rcu_dereference(parent->cl_xprt) != xprt)
if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
break;
if (clnt->cl_autobind)
break;
......@@ -683,11 +683,9 @@ void rpcb_getport_async(struct rpc_task *task)
int status;
rcu_read_lock();
do {
clnt = rpcb_find_transport_owner(task->tk_client);
xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
} while (xprt == NULL);
clnt = rpcb_find_transport_owner(task->tk_client);
rcu_read_unlock();
xprt = xprt_get(task->tk_xprt);
dprintk("RPC: %5u %s(%s, %u, %u, %d)\n",
task->tk_pid, __func__,
......
......@@ -909,6 +909,8 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
/* Initialize workqueue for async tasks */
task->tk_workqueue = task_setup_data->workqueue;
task->tk_xprt = xprt_get(task_setup_data->rpc_xprt);
if (task->tk_ops->rpc_call_prepare != NULL)
task->tk_action = rpc_prepare_task;
......
......@@ -48,6 +48,7 @@
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/metrics.h>
#include <linux/sunrpc/bc_xprt.h>
#include <linux/rcupdate.h>
#include <trace/events/sunrpc.h>
......@@ -1166,7 +1167,7 @@ void xprt_free(struct rpc_xprt *xprt)
{
put_net(xprt->xprt_net);
xprt_free_all_slots(xprt);
kfree(xprt);
kfree_rcu(xprt, rcu);
}
EXPORT_SYMBOL_GPL(xprt_free);
......@@ -1180,7 +1181,7 @@ EXPORT_SYMBOL_GPL(xprt_free);
*/
void xprt_reserve(struct rpc_task *task)
{
struct rpc_xprt *xprt;
struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
if (task->tk_rqstp != NULL)
......@@ -1188,11 +1189,8 @@ void xprt_reserve(struct rpc_task *task)
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
rcu_read_lock();
xprt = rcu_dereference(task->tk_client->cl_xprt);
if (!xprt_throttle_congested(xprt, task))
xprt->ops->alloc_slot(xprt, task);
rcu_read_unlock();
}
/**
......@@ -1206,7 +1204,7 @@ void xprt_reserve(struct rpc_task *task)
*/
void xprt_retry_reserve(struct rpc_task *task)
{
struct rpc_xprt *xprt;
struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
if (task->tk_rqstp != NULL)
......@@ -1214,10 +1212,7 @@ void xprt_retry_reserve(struct rpc_task *task)
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
rcu_read_lock();
xprt = rcu_dereference(task->tk_client->cl_xprt);
xprt->ops->alloc_slot(xprt, task);
rcu_read_unlock();
}
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
......@@ -1264,11 +1259,9 @@ void xprt_release(struct rpc_task *task)
if (req == NULL) {
if (task->tk_client) {
rcu_read_lock();
xprt = rcu_dereference(task->tk_client->cl_xprt);
xprt = task->tk_xprt;
if (xprt->snd_task == task)
xprt_release_write(xprt, task);
rcu_read_unlock();
}
return;
}
......@@ -1307,7 +1300,7 @@ void xprt_release(struct rpc_task *task)
static void xprt_init(struct rpc_xprt *xprt, struct net *net)
{
atomic_set(&xprt->count, 1);
kref_init(&xprt->kref);
spin_lock_init(&xprt->transport_lock);
spin_lock_init(&xprt->reserve_lock);
......@@ -1318,6 +1311,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
spin_lock_init(&xprt->bc_pa_lock);
INIT_LIST_HEAD(&xprt->bc_pa_list);
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
INIT_LIST_HEAD(&xprt->xprt_switch);
xprt->last_used = jiffies;
xprt->cwnd = RPC_INITCWND;
......@@ -1415,6 +1409,24 @@ static void xprt_destroy(struct rpc_xprt *xprt)
xprt->ops->destroy(xprt);
}
static void xprt_destroy_kref(struct kref *kref)
{
xprt_destroy(container_of(kref, struct rpc_xprt, kref));
}
/**
* xprt_get - return a reference to an RPC transport.
* @xprt: pointer to the transport
*
*/
struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
{
if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
return xprt;
return NULL;
}
EXPORT_SYMBOL_GPL(xprt_get);
/**
* xprt_put - release a reference to an RPC transport.
* @xprt: pointer to the transport
......@@ -1422,7 +1434,7 @@ static void xprt_destroy(struct rpc_xprt *xprt)
*/
void xprt_put(struct rpc_xprt *xprt)
{
if (atomic_dec_and_test(&xprt->count))
xprt_destroy(xprt);
if (xprt != NULL)
kref_put(&xprt->kref, xprt_destroy_kref);
}
EXPORT_SYMBOL_GPL(xprt_put);
/*
* Multipath support for RPC
*
* Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
*
* Trond Myklebust <trond.myklebust@primarydata.com>
*
*/
#include <linux/types.h>
#include <linux/kref.h>
#include <linux/list.h>
#include <linux/rcupdate.h>
#include <linux/rculist.h>
#include <linux/slab.h>
#include <asm/cmpxchg.h>
#include <linux/spinlock.h>
#include <linux/sunrpc/xprt.h>
#include <linux/sunrpc/xprtmultipath.h>
typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head,
const struct rpc_xprt *cur);
static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular;
static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin;
static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall;
static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt)
{
if (unlikely(xprt_get(xprt) == NULL))
return;
list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list);
smp_wmb();
if (xps->xps_nxprts == 0)
xps->xps_net = xprt->xprt_net;
xps->xps_nxprts++;
}
/**
* rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch
* @xps: pointer to struct rpc_xprt_switch
* @xprt: pointer to struct rpc_xprt
*
* Adds xprt to the end of the list of struct rpc_xprt in xps.
*/
void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt)
{
if (xprt == NULL)
return;
spin_lock(&xps->xps_lock);
if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
xprt_switch_add_xprt_locked(xps, xprt);
spin_unlock(&xps->xps_lock);
}
static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt)
{
if (unlikely(xprt == NULL))
return;
xps->xps_nxprts--;
if (xps->xps_nxprts == 0)
xps->xps_net = NULL;
smp_wmb();
list_del_rcu(&xprt->xprt_switch);
}
/**
* rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch
* @xps: pointer to struct rpc_xprt_switch
* @xprt: pointer to struct rpc_xprt
*
* Removes xprt from the list of struct rpc_xprt in xps.
*/
void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt)
{
spin_lock(&xps->xps_lock);
xprt_switch_remove_xprt_locked(xps, xprt);
spin_unlock(&xps->xps_lock);
xprt_put(xprt);
}
/**
* xprt_switch_alloc - Allocate a new struct rpc_xprt_switch
* @xprt: pointer to struct rpc_xprt
* @gfp_flags: allocation flags
*
* On success, returns an initialised struct rpc_xprt_switch, containing
* the entry xprt. Returns NULL on failure.
*/
struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
gfp_t gfp_flags)
{
struct rpc_xprt_switch *xps;
xps = kmalloc(sizeof(*xps), gfp_flags);
if (xps != NULL) {
spin_lock_init(&xps->xps_lock);
kref_init(&xps->xps_kref);
xps->xps_nxprts = 0;
INIT_LIST_HEAD(&xps->xps_xprt_list);
xps->xps_iter_ops = &rpc_xprt_iter_singular;
xprt_switch_add_xprt_locked(xps, xprt);
}
return xps;
}
static void xprt_switch_free_entries(struct rpc_xprt_switch *xps)
{
spin_lock(&xps->xps_lock);
while (!list_empty(&xps->xps_xprt_list)) {
struct rpc_xprt *xprt;
xprt = list_first_entry(&xps->xps_xprt_list,
struct rpc_xprt, xprt_switch);
xprt_switch_remove_xprt_locked(xps, xprt);
spin_unlock(&xps->xps_lock);
xprt_put(xprt);
spin_lock(&xps->xps_lock);
}
spin_unlock(&xps->xps_lock);
}
static void xprt_switch_free(struct kref *kref)
{
struct rpc_xprt_switch *xps = container_of(kref,
struct rpc_xprt_switch, xps_kref);
xprt_switch_free_entries(xps);
kfree_rcu(xps, xps_rcu);
}
/**
* xprt_switch_get - Return a reference to a rpc_xprt_switch
* @xps: pointer to struct rpc_xprt_switch
*
* Returns a reference to xps unless the refcount is already zero.
*/
struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps)
{
if (xps != NULL && kref_get_unless_zero(&xps->xps_kref))
return xps;
return NULL;
}
/**
* xprt_switch_put - Release a reference to a rpc_xprt_switch
* @xps: pointer to struct rpc_xprt_switch
*
* Release the reference to xps, and free it once the refcount is zero.
*/
void xprt_switch_put(struct rpc_xprt_switch *xps)
{
if (xps != NULL)
kref_put(&xps->xps_kref, xprt_switch_free);
}
/**
* rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch
* @xps: pointer to struct rpc_xprt_switch
*
* Sets a round-robin default policy for iterators acting on xps.
*/
void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps)
{
if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin)
WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin);
}
static
const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi)
{
if (xpi->xpi_ops != NULL)
return xpi->xpi_ops;
return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops;
}
static
void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi)
{
}
static
void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi)
{
WRITE_ONCE(xpi->xpi_cursor, NULL);
}
static
struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head)
{
return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch);
}
static
struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi)
{
struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
if (xps == NULL)
return NULL;
return xprt_switch_find_first_entry(&xps->xps_xprt_list);
}
static
struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head,
const struct rpc_xprt *cur)
{
struct rpc_xprt *pos;
list_for_each_entry_rcu(pos, head, xprt_switch) {
if (cur == pos)
return pos;
}
return NULL;
}
static
struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi)
{
struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
struct list_head *head;
if (xps == NULL)
return NULL;
head = &xps->xps_xprt_list;
if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2)
return xprt_switch_find_first_entry(head);
return xprt_switch_find_current_entry(head, xpi->xpi_cursor);
}
static
struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head,
const struct rpc_xprt *cur)
{
struct rpc_xprt *pos, *prev = NULL;
list_for_each_entry_rcu(pos, head, xprt_switch) {
if (cur == prev)
return pos;
prev = pos;
}
return NULL;
}
static
struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head,
struct rpc_xprt **cursor,
xprt_switch_find_xprt_t find_next)
{
struct rpc_xprt *cur, *pos, *old;
cur = READ_ONCE(*cursor);
for (;;) {
old = cur;
pos = find_next(head, old);
if (pos == NULL)
break;
cur = cmpxchg_relaxed(cursor, old, pos);
if (cur == old)
break;
}
return pos;
}
static
struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
xprt_switch_find_xprt_t find_next)
{
struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
struct list_head *head;
if (xps == NULL)
return NULL;
head = &xps->xps_xprt_list;
if (xps->xps_nxprts < 2)
return xprt_switch_find_first_entry(head);
return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next);
}
static
struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head,
const struct rpc_xprt *cur)
{
struct rpc_xprt *ret;
ret = xprt_switch_find_next_entry(head, cur);
if (ret != NULL)
return ret;
return xprt_switch_find_first_entry(head);
}
static
struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
{
return xprt_iter_next_entry_multiple(xpi,
xprt_switch_find_next_entry_roundrobin);
}
static
struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi)
{
return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry);
}
/*
* xprt_iter_rewind - Resets the xprt iterator
* @xpi: pointer to rpc_xprt_iter
*
* Resets xpi to ensure that it points to the first entry in the list
* of transports.
*/
static
void xprt_iter_rewind(struct rpc_xprt_iter *xpi)
{
rcu_read_lock();
xprt_iter_ops(xpi)->xpi_rewind(xpi);
rcu_read_unlock();
}
static void __xprt_iter_init(struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *xps,
const struct rpc_xprt_iter_ops *ops)
{
rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps));
xpi->xpi_cursor = NULL;
xpi->xpi_ops = ops;
}
/**
* xprt_iter_init - Initialise an xprt iterator
* @xpi: pointer to rpc_xprt_iter
* @xps: pointer to rpc_xprt_switch
*
* Initialises the iterator to use the default iterator ops
* as set in xps. This function is mainly intended for internal
* use in the rpc_client.
*/
void xprt_iter_init(struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *xps)
{
__xprt_iter_init(xpi, xps, NULL);
}
/**
* xprt_iter_init_listall - Initialise an xprt iterator
* @xpi: pointer to rpc_xprt_iter
* @xps: pointer to rpc_xprt_switch
*
* Initialises the iterator to iterate once through the entire list
* of entries in xps.
*/
void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *xps)
{
__xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall);
}
/**
* xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch
* @xpi: pointer to rpc_xprt_iter
* @xps: pointer to a new rpc_xprt_switch or NULL
*
* Swaps out the existing xpi->xpi_xpswitch with a new value.
*/
struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *newswitch)
{
struct rpc_xprt_switch __rcu *oldswitch;
/* Atomically swap out the old xpswitch */
oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch));
if (newswitch != NULL)
xprt_iter_rewind(xpi);
return rcu_dereference_protected(oldswitch, true);
}
/**
* xprt_iter_destroy - Destroys the xprt iterator
* @xpi pointer to rpc_xprt_iter
*/
void xprt_iter_destroy(struct rpc_xprt_iter *xpi)
{
xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL));
}
/**
* xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor
* @xpi: pointer to rpc_xprt_iter
*
* Returns a pointer to the struct rpc_xprt that is currently
* pointed to by the cursor.
* Caller must be holding rcu_read_lock().
*/
struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi)
{
WARN_ON_ONCE(!rcu_read_lock_held());
return xprt_iter_ops(xpi)->xpi_xprt(xpi);
}
static
struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi,
struct rpc_xprt *(*fn)(struct rpc_xprt_iter *))
{
struct rpc_xprt *ret;
do {
ret = fn(xpi);
if (ret == NULL)
break;
ret = xprt_get(ret);
} while (ret == NULL);
return ret;
}
/**
* xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor
* @xpi: pointer to rpc_xprt_iter
*
* Returns a reference to the struct rpc_xprt that is currently
* pointed to by the cursor.
*/
struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi)
{
struct rpc_xprt *xprt;
rcu_read_lock();
xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt);
rcu_read_unlock();
return xprt;
}
/**
* xprt_iter_get_next - Returns the next rpc_xprt following the cursor
* @xpi: pointer to rpc_xprt_iter
*
* Returns a reference to the struct rpc_xprt that immediately follows the
* entry pointed to by the cursor.
*/
struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi)
{
struct rpc_xprt *xprt;
rcu_read_lock();
xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next);
rcu_read_unlock();
return xprt;
}
/* Policy for always returning the first entry in the rpc_xprt_switch */
static
const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = {
.xpi_rewind = xprt_iter_no_rewind,
.xpi_xprt = xprt_iter_first_entry,
.xpi_next = xprt_iter_first_entry,
};
/* Policy for round-robin iteration of entries in the rpc_xprt_switch */
static
const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = {
.xpi_rewind = xprt_iter_default_rewind,
.xpi_xprt = xprt_iter_current_entry,
.xpi_next = xprt_iter_next_entry_roundrobin,
};
/* Policy for once-through iteration of entries in the rpc_xprt_switch */
static
const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = {
.xpi_rewind = xprt_iter_default_rewind,
.xpi_xprt = xprt_iter_current_entry,
.xpi_next = xprt_iter_next_entry_all,
};
......@@ -1844,9 +1844,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
*/
static void xs_local_rpcbind(struct rpc_task *task)
{
rcu_read_lock();
xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt));
rcu_read_unlock();
xprt_set_bound(task->tk_xprt);
}
static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment