Commit 61a70057 authored by Trond Myklebust's avatar Trond Myklebust

Fix up hangs with the upcall mechanism for RPCSEC_GSS and the

NFSv4 idmapper.

 - Correct bugs in idmapper wait code.
 - Correct layering bugs in RPCSEC_GSS and idmapper
 - Ensure we advance the file pointer in case of partial
   reads/writes of a message on the pipes
parent 8dd19f9f
......@@ -113,6 +113,7 @@ nfs_idmap_new(struct nfs_server *server)
init_MUTEX(&idmap->idmap_lock);
init_MUTEX(&idmap->idmap_im_lock);
init_MUTEX(&idmap->idmap_hash_lock);
init_waitqueue_head(&idmap->idmap_wq);
return (idmap);
......@@ -126,7 +127,10 @@ nfs_idmap_delete(struct nfs_server *server)
{
struct idmap *idmap = server->idmap;
if (!idmap)
return;
rpc_unlink(idmap->idmap_path);
server->idmap = NULL;
kfree(idmap);
}
......@@ -181,17 +185,17 @@ nfs_idmap_id(struct nfs_server *server, u_int8_t type, char *name,
msg.data = im;
msg.len = sizeof(*im);
init_waitqueue_head(&idmap->idmap_wq);
add_wait_queue(&idmap->idmap_wq, &wq);
set_current_state(TASK_UNINTERRUPTIBLE);
if (rpc_queue_upcall(idmap->idmap_dentry->d_inode, &msg) < 0) {
set_current_state(TASK_RUNNING);
remove_wait_queue(&idmap->idmap_wq, &wq);
goto out;
}
set_current_state(TASK_UNINTERRUPTIBLE);
up(&idmap->idmap_im_lock);
schedule();
current->state = TASK_RUNNING;
remove_wait_queue(&idmap->idmap_wq, &wq);
down(&idmap->idmap_im_lock);
/*
......@@ -252,20 +256,21 @@ nfs_idmap_name(struct nfs_server *server, u_int8_t type, uid_t id,
msg.data = im;
msg.len = sizeof(*im);
init_waitqueue_head(&idmap->idmap_wq);
add_wait_queue(&idmap->idmap_wq, &wq);
set_current_state(TASK_UNINTERRUPTIBLE);
if (rpc_queue_upcall(idmap->idmap_dentry->d_inode, &msg) < 0) {
set_current_state(TASK_RUNNING);
remove_wait_queue(&idmap->idmap_wq, &wq);
goto out;
}
/*
* XXX add timeouts here
*/
set_current_state(TASK_UNINTERRUPTIBLE);
up(&idmap->idmap_im_lock);
schedule();
current->state = TASK_RUNNING;
remove_wait_queue(&idmap->idmap_wq, &wq);
down(&idmap->idmap_im_lock);
if (im->im_status & IDMAP_STATUS_SUCCESS) {
......@@ -298,8 +303,14 @@ idmap_pipe_upcall(struct file *filp, struct rpc_pipe_msg *msg,
mlen = buflen;
left = copy_to_user(dst, data, mlen);
return (mlen - left);
if (left < 0) {
msg->errno = left;
return left;
}
mlen -= left;
msg->copied += mlen;
msg->errno = 0;
return mlen;
}
static ssize_t
......@@ -342,7 +353,6 @@ idmap_pipe_downcall(struct file *filp, const char *src, size_t mlen)
if (match) {
memcpy(im, &im_in, sizeof(*im));
wake_up(&idmap->idmap_wq);
__rpc_purge_current_upcall(filp);
} else if (!badmsg) {
hashtype = im_in.im_conv == IDMAP_CONV_IDTONAME ?
IDMAP_HASH_TYPE_ID : IDMAP_HASH_TYPE_NAME;
......@@ -361,6 +371,8 @@ idmap_pipe_destroy_msg(struct rpc_pipe_msg *msg)
struct idmap_msg *im = msg->data;
struct idmap *idmap = container_of(im, struct idmap, idmap_im);
if (msg->errno >= 0)
return;
down(&idmap->idmap_im_lock);
im->im_status = IDMAP_STATUS_LOOKUPFAIL;
wake_up(&idmap->idmap_wq);
......
......@@ -43,8 +43,5 @@ extern int rpc_rmdir(char *);
extern struct dentry *rpc_mkpipe(char *, void *, struct rpc_pipe_ops *, int flags);
extern int rpc_unlink(char *);
void __rpc_purge_current_upcall(struct file *);
void __rpc_purge_one_upcall(struct file *filp, struct rpc_pipe_msg *target);
#endif
#endif
......@@ -292,21 +292,59 @@ struct gss_upcall_msg {
static void
gss_release_msg(struct gss_upcall_msg *gss_msg)
{
if (atomic_dec_and_test(&gss_msg->count))
kfree(gss_msg);
struct gss_auth *gss_auth = gss_msg->auth;
if (!atomic_dec_and_lock(&gss_msg->count, &gss_auth->lock))
return;
if (!list_empty(&gss_msg->list))
list_del(&gss_msg->list);
spin_unlock(&gss_auth->lock);
kfree(gss_msg);
}
static struct gss_upcall_msg *
gss_find_upcall(struct gss_auth *gss_auth, uid_t uid)
__gss_find_upcall(struct gss_auth *gss_auth, uid_t uid)
{
struct gss_upcall_msg *pos;
list_for_each_entry(pos, &gss_auth->upcalls, list) {
if (pos->uid == uid)
return pos;
if (pos->uid != uid)
continue;
atomic_inc(&pos->count);
return pos;
}
return NULL;
}
static struct gss_upcall_msg *
gss_find_upcall(struct gss_auth *gss_auth, uid_t uid)
{
struct gss_upcall_msg *gss_msg;
spin_lock(&gss_auth->lock);
gss_msg = __gss_find_upcall(gss_auth, uid);
spin_unlock(&gss_auth->lock);
return gss_msg;
}
static void
__gss_unhash_msg(struct gss_upcall_msg *gss_msg)
{
if (list_empty(&gss_msg->list))
return;
list_del_init(&gss_msg->list);
rpc_wake_up(&gss_msg->waitq);
}
static void
gss_unhash_msg(struct gss_upcall_msg *gss_msg)
{
struct gss_auth *gss_auth = gss_msg->auth;
spin_lock(&gss_auth->lock);
__gss_unhash_msg(gss_msg);
spin_unlock(&gss_auth->lock);
}
static void
gss_release_callback(struct rpc_task *task)
{
......@@ -315,12 +353,10 @@ gss_release_callback(struct rpc_task *task)
struct gss_auth, rpc_auth);
struct gss_upcall_msg *gss_msg;
spin_lock(&gss_auth->lock);
gss_msg = gss_find_upcall(gss_auth, task->tk_msg.rpc_cred->cr_uid);
if (gss_msg) {
rpc_wake_up(&gss_msg->waitq);
}
spin_unlock(&gss_auth->lock);
BUG_ON(!gss_msg);
atomic_dec(&gss_msg->count);
gss_release_msg(gss_msg);
}
static int
......@@ -334,22 +370,22 @@ gss_upcall(struct rpc_clnt *clnt, struct rpc_task *task, uid_t uid)
int res;
retry:
gss_msg = gss_find_upcall(gss_auth, uid);
if (gss_msg == NULL && gss_new == NULL) {
gss_msg = __gss_find_upcall(gss_auth, uid);
if (gss_msg)
goto out_sleep;
if (gss_new == NULL) {
spin_unlock(&gss_auth->lock);
gss_new = kmalloc(sizeof(*gss_new), GFP_KERNEL);
spin_lock(&gss_auth->lock);
if (gss_new)
goto retry;
return -ENOMEM;
return -ENOMEM;
spin_lock(&gss_auth->lock);
goto retry;
}
if (gss_msg)
goto out_sleep;
gss_msg = gss_new;
memset(gss_new, 0, sizeof(*gss_new));
INIT_LIST_HEAD(&gss_new->list);
INIT_RPC_WAITQ(&gss_new->waitq, "RPCSEC_GSS upcall waitq");
atomic_set(&gss_new->count, 1);
atomic_set(&gss_new->count, 2);
msg = &gss_new->msg;
msg->data = &gss_new->uid;
msg->len = sizeof(gss_new->uid);
......@@ -361,15 +397,14 @@ gss_upcall(struct rpc_clnt *clnt, struct rpc_task *task, uid_t uid)
rpc_sleep_on(&gss_msg->waitq, task, gss_release_callback, NULL);
spin_unlock(&gss_auth->lock);
res = rpc_queue_upcall(dentry->d_inode, msg);
spin_lock(&gss_auth->lock);
if (res) {
rpc_wake_up(&gss_msg->waitq);
list_del(&gss_msg->list);
gss_unhash_msg(gss_msg);
gss_release_msg(gss_msg);
}
return res;
out_sleep:
rpc_sleep_on(&gss_msg->waitq, task, NULL, NULL);
rpc_sleep_on(&gss_msg->waitq, task, gss_release_callback, NULL);
spin_unlock(&gss_auth->lock);
if (gss_new)
kfree(gss_new);
return 0;
......@@ -386,7 +421,14 @@ gss_pipe_upcall(struct file *filp, struct rpc_pipe_msg *msg,
if (mlen > buflen)
mlen = buflen;
left = copy_to_user(dst, data, mlen);
return mlen - left;
if (left < 0) {
msg->errno = left;
return left;
}
mlen -= left;
msg->copied += mlen;
msg->errno = 0;
return mlen;
}
static ssize_t
......@@ -432,14 +474,13 @@ gss_pipe_downcall(struct file *filp, const char *src, size_t mlen)
else
gss_cred_set_ctx(cred, ctx);
spin_lock(&gss_auth->lock);
gss_msg = gss_find_upcall(gss_auth, acred.uid);
gss_msg = __gss_find_upcall(gss_auth, acred.uid);
if (gss_msg) {
list_del(&gss_msg->list);
__rpc_purge_one_upcall(filp, &gss_msg->msg);
rpc_wake_up(&gss_msg->waitq);
__gss_unhash_msg(gss_msg);
spin_unlock(&gss_auth->lock);
gss_release_msg(gss_msg);
}
spin_unlock(&gss_auth->lock);
} else
spin_unlock(&gss_auth->lock);
rpc_release_client(clnt);
return mlen;
err:
......@@ -454,13 +495,10 @@ void
gss_pipe_destroy_msg(struct rpc_pipe_msg *msg)
{
struct gss_upcall_msg *gss_msg = container_of(msg, struct gss_upcall_msg, msg);
struct gss_auth *gss_auth = gss_msg->auth;
spin_lock(&gss_auth->lock);
list_del(&gss_msg->list);
rpc_wake_up(&gss_msg->waitq);
if (msg->errno < 0)
gss_unhash_msg(gss_msg);
gss_release_msg(gss_msg);
spin_unlock(&gss_auth->lock);
}
/*
......
......@@ -59,43 +59,6 @@ rpc_purge_upcall(struct inode *inode, int err)
up(&inode->i_sem);
}
/*
* XXX should only be called in ->downcall
*/
void
__rpc_purge_current_upcall(struct file *filp)
{
struct rpc_pipe_msg *msg;
msg = filp->private_data;
filp->private_data = NULL;
if (msg != NULL)
msg->errno = 0;
}
void
__rpc_purge_one_upcall(struct file *filp, struct rpc_pipe_msg *target)
{
struct rpc_inode *rpci = RPC_I(filp->f_dentry->d_inode);
struct rpc_pipe_msg *msg;
msg = filp->private_data;
if (msg == target) {
filp->private_data = NULL;
goto found;
}
list_for_each_entry(msg, &rpci->pipe, list) {
if (msg == target) {
list_del(&msg->list);
goto found;
}
}
BUG();
found:
return;
}
int
rpc_queue_upcall(struct inode *inode, struct rpc_pipe_msg *msg)
{
......@@ -198,14 +161,15 @@ rpc_pipe_read(struct file *filp, char __user *buf, size_t len, loff_t *offset)
list_del_init(&msg->list);
rpci->pipelen -= msg->len;
filp->private_data = msg;
msg->copied = 0;
}
if (msg == NULL)
goto out_unlock;
}
/* NOTE: it is up to the callback to update msg->copied */
res = rpci->ops->upcall(filp, msg, buf, len);
if (res < 0 || msg->len == msg->copied) {
filp->private_data = NULL;
msg->errno = 0;
rpci->ops->destroy_msg(msg);
}
out_unlock:
......@@ -685,7 +649,7 @@ rpc_mkpipe(char *path, void *private, struct rpc_pipe_ops *ops, int flags)
if (IS_ERR(dentry))
return dentry;
dir = nd.dentry->d_inode;
inode = rpc_get_inode(dir->i_sb, S_IFSOCK | S_IRUSR | S_IXUSR);
inode = rpc_get_inode(dir->i_sb, S_IFSOCK | S_IRUSR | S_IWUSR);
if (!inode)
goto err_dput;
inode->i_ino = iunique(dir->i_sb, 100);
......
......@@ -56,8 +56,6 @@ EXPORT_SYMBOL(rpc_unlink);
EXPORT_SYMBOL(rpc_wake_up);
EXPORT_SYMBOL(rpc_queue_upcall);
EXPORT_SYMBOL(rpc_mkpipe);
EXPORT_SYMBOL(__rpc_purge_current_upcall);
EXPORT_SYMBOL(__rpc_purge_one_upcall);
/* Client transport */
EXPORT_SYMBOL(xprt_create_proto);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment