Commit 620563dd authored by J. Bruce Fields's avatar J. Bruce Fields Committed by Linus Torvalds

This makes several changes to the gss upcalls

  1. Currently rpc_queue_upcall returns -EPIPE if we make an upcall on a pipe
     that userland hasn't opened yet, and we timeout and retry later.  This
     can lead to an unnecessary delay on mount, because rpc.gssd is racing
     to open the newly created pipe while the nfs code is making the first
     upcall.  If rpc.gssd loses, then we end up with a delay equal to the
     length of the timeout.  So instead we allow rpc_queue_upcall to queue
     upcalls on pipes that aren't opened yet.  To deal with the case of
     other upcall-users (e.g., the name<->uid mapping upcall code) who
     do want to know if the pipe isn't open (in the name<->uid case you can
     choose just to map everyone to nobody if the user doesn't want to run
     idmapd), we add a flag parameter to rpc_mkpipe that allows us to choose
     the kind of behavior we want at the time we create the pipe.

  2. Currently gss_msg's are destroyed the moment they have been completely
     read (by the call to destroy_msg in rpc_pipe_read).  This means an
     rpc_wake_up is done then, and can't be done later (because the gss_msg is
     gone, along with gss_msg->waitq).  It will typically be some time yet
     before the downcall comes, so the woken-up processes will have to wait and
     retry later; as above this leads to unnecessary delays.  Also, since the
     gss_msg is deleted from the list of gss_msgs's, we forget that an upcall
     to get creds for the user in question is still pending, so multiple
     unnecessary upcalls will be made.  This patch changes gss_pipe_upcall to
     never update msg->copied so that rpc_pipe_read never destroys the message.
     Instead, we wait till a downcall arrives to remove the upcall, using the
     new function __rpc_purge_one_upcall, which searches the list of pending
     rpc_pipe_msg's on the inode as well as checking the current upcall, to
     deal with the case where rpc.gssd might preemptively create a context for
     a user that there's already a pending upcall for.  Note also that this
     means that repeated reads by rpc.gssd will return the same data until
     rpc.gssd does a downcall.  This also gives us a better chance of
     recovering from rpc.gssd crashes.
parent 315a0e34
......@@ -106,7 +106,7 @@ nfs_idmap_new(struct nfs_server *server)
"%s/idmap", idmap->idmap_server->client->cl_pathname);
idmap->idmap_dentry = rpc_mkpipe(idmap->idmap_path,
idmap->idmap_server, &idmap_upcall_ops);
idmap->idmap_server, &idmap_upcall_ops, 0);
if (IS_ERR(idmap->idmap_dentry))
goto err_free;
......
......@@ -24,6 +24,8 @@ struct rpc_inode {
int pipelen;
int nreaders;
wait_queue_head_t waitq;
#define RPC_PIPE_WAIT_FOR_OPEN 1
int flags;
struct rpc_pipe_ops *ops;
};
......@@ -38,10 +40,11 @@ extern int rpc_queue_upcall(struct inode *, struct rpc_pipe_msg *);
extern struct dentry *rpc_mkdir(char *, struct rpc_clnt *);
extern int rpc_rmdir(char *);
extern struct dentry *rpc_mkpipe(char *, void *, struct rpc_pipe_ops *);
extern struct dentry *rpc_mkpipe(char *, void *, struct rpc_pipe_ops *, int flags);
extern int rpc_unlink(char *);
void __rpc_purge_current_upcall(struct file *);
void __rpc_purge_one_upcall(struct file *filp, struct rpc_pipe_msg *target);
#endif
#endif
......@@ -283,6 +283,7 @@ gss_parse_init_downcall(struct gss_api_mech *gm, struct xdr_netobj *buf,
struct gss_upcall_msg {
struct rpc_pipe_msg msg;
struct list_head list;
struct gss_auth *auth;
struct rpc_wait_queue waitq;
uid_t uid;
atomic_t count;
......@@ -318,8 +319,6 @@ gss_release_callback(struct rpc_task *task)
gss_msg = gss_find_upcall(gss_auth, task->tk_msg.rpc_cred->cr_uid);
if (gss_msg) {
rpc_wake_up(&gss_msg->waitq);
list_del(&gss_msg->list);
gss_release_msg(gss_msg);
}
spin_unlock(&gss_auth->lock);
}
......@@ -350,11 +349,12 @@ gss_upcall(struct rpc_clnt *clnt, struct rpc_task *task, uid_t uid)
memset(gss_new, 0, sizeof(*gss_new));
INIT_LIST_HEAD(&gss_new->list);
INIT_RPC_WAITQ(&gss_new->waitq, "RPCSEC_GSS upcall waitq");
atomic_set(&gss_new->count, 2);
atomic_set(&gss_new->count, 1);
msg = &gss_new->msg;
msg->data = &gss_new->uid;
msg->len = sizeof(gss_new->uid);
gss_new->uid = uid;
gss_new->auth = gss_auth;
list_add(&gss_new->list, &gss_auth->upcalls);
gss_new = NULL;
task->tk_timeout = 5 * HZ;
......@@ -362,8 +362,11 @@ gss_upcall(struct rpc_clnt *clnt, struct rpc_task *task, uid_t uid)
spin_unlock(&gss_auth->lock);
res = rpc_queue_upcall(dentry->d_inode, msg);
spin_lock(&gss_auth->lock);
if (res)
if (res) {
rpc_wake_up(&gss_msg->waitq);
list_del(&gss_msg->list);
gss_release_msg(gss_msg);
}
return res;
out_sleep:
rpc_sleep_on(&gss_msg->waitq, task, NULL, NULL);
......@@ -377,13 +380,12 @@ gss_pipe_upcall(struct file *filp, struct rpc_pipe_msg *msg,
char *dst, size_t buflen)
{
char *data = (char *)msg->data + msg->copied;
ssize_t mlen = msg->len - msg->copied;
ssize_t mlen = msg->len;
ssize_t left;
if (mlen > buflen)
mlen = buflen;
left = copy_to_user(dst, data, mlen);
msg->copied += mlen - left;
return mlen - left;
}
......@@ -431,8 +433,12 @@ gss_pipe_downcall(struct file *filp, const char *src, size_t mlen)
gss_cred_set_ctx(cred, ctx);
spin_lock(&gss_auth->lock);
gss_msg = gss_find_upcall(gss_auth, acred.uid);
if (gss_msg)
if (gss_msg) {
list_del(&gss_msg->list);
__rpc_purge_one_upcall(filp, &gss_msg->msg);
rpc_wake_up(&gss_msg->waitq);
gss_release_msg(gss_msg);
}
spin_unlock(&gss_auth->lock);
rpc_release_client(clnt);
return mlen;
......@@ -448,9 +454,13 @@ void
gss_pipe_destroy_msg(struct rpc_pipe_msg *msg)
{
struct gss_upcall_msg *gss_msg = container_of(msg, struct gss_upcall_msg, msg);
struct gss_auth *gss_auth = gss_msg->auth;
spin_lock(&gss_auth->lock);
list_del(&gss_msg->list);
rpc_wake_up(&gss_msg->waitq);
gss_release_msg(gss_msg);
spin_unlock(&gss_auth->lock);
}
/*
......@@ -486,7 +496,7 @@ gss_create(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
snprintf(gss_auth->path, sizeof(gss_auth->path), "%s/%s",
clnt->cl_pathname,
gss_auth->mech->gm_ops->name);
gss_auth->dentry = rpc_mkpipe(gss_auth->path, clnt, &gss_upcall_ops);
gss_auth->dentry = rpc_mkpipe(gss_auth->path, clnt, &gss_upcall_ops, RPC_PIPE_WAIT_FOR_OPEN);
if (IS_ERR(gss_auth->dentry))
goto err_free;
......
......@@ -75,6 +75,28 @@ __rpc_purge_current_upcall(struct file *filp)
msg->errno = 0;
}
void
__rpc_purge_one_upcall(struct file *filp, struct rpc_pipe_msg *target)
{
struct rpc_inode *rpci = RPC_I(filp->f_dentry->d_inode);
struct rpc_pipe_msg *msg;
msg = filp->private_data;
if (msg == target) {
filp->private_data = NULL;
goto found;
}
list_for_each_entry(msg, &rpci->pipe, list) {
if (msg == target) {
list_del(&msg->list);
goto found;
}
}
BUG();
found:
return;
}
int
rpc_queue_upcall(struct inode *inode, struct rpc_pipe_msg *msg)
{
......@@ -82,7 +104,7 @@ rpc_queue_upcall(struct inode *inode, struct rpc_pipe_msg *msg)
int res = 0;
down(&inode->i_sem);
if (rpci->nreaders) {
if (rpci->nreaders || (rpci->flags & RPC_PIPE_WAIT_FOR_OPEN)) {
list_add_tail(&msg->list, &rpci->pipe);
rpci->pipelen += msg->len;
} else
......@@ -149,7 +171,7 @@ rpc_pipe_release(struct inode *inode, struct file *filp)
down(&inode->i_sem);
if (filp->f_mode & FMODE_READ)
rpci->nreaders --;
if (!rpci->nreaders)
if (!rpci->nreaders && !(rpci->flags & RPC_PIPE_WAIT_FOR_OPEN))
__rpc_purge_upcall(inode, -EPIPE);
up(&inode->i_sem);
return 0;
......@@ -646,7 +668,7 @@ rpc_rmdir(char *path)
}
struct dentry *
rpc_mkpipe(char *path, void *private, struct rpc_pipe_ops *ops)
rpc_mkpipe(char *path, void *private, struct rpc_pipe_ops *ops, int flags)
{
struct nameidata nd;
struct dentry *dentry;
......@@ -665,6 +687,7 @@ rpc_mkpipe(char *path, void *private, struct rpc_pipe_ops *ops)
d_instantiate(dentry, inode);
rpci = RPC_I(inode);
rpci->private = private;
rpci->flags = flags;
rpci->ops = ops;
inode_dir_notify(dir, DN_CREATE);
out:
......
......@@ -57,6 +57,7 @@ EXPORT_SYMBOL(rpc_wake_up);
EXPORT_SYMBOL(rpc_queue_upcall);
EXPORT_SYMBOL(rpc_mkpipe);
EXPORT_SYMBOL(__rpc_purge_current_upcall);
EXPORT_SYMBOL(__rpc_purge_one_upcall);
/* Client transport */
EXPORT_SYMBOL(xprt_create_proto);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment