Commit 5d6451b1 authored by Jeff Layton's avatar Jeff Layton Committed by Ilya Dryomov

ceph: shut down access to inode when async create fails

Add proper error handling for when an async create fails. The inode
never existed, so any dirty caps or data are now toast. We already
d_drop the dentry in that case, but the now-stale inode may still be
around. We want to shut down access to these inodes, and ensure that
they can't harbor any more dirty data, which can cause problems at
umount time.

When this occurs, flag such inodes as being SHUTDOWN, and trash any caps
and cap flushes that may be in flight for them, and invalidate the
pagecache for the inode. Add a new helper that can check whether an
inode or an entire mount is now shut down, and call it instead of
accessing the mount_state directly in places where we test that now.

URL: https://tracker.ceph.com/issues/51279Signed-off-by: default avatarJeff Layton <jlayton@kernel.org>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 36e6da98
......@@ -724,7 +724,7 @@ static int ceph_writepages_start(struct address_space *mapping,
wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
(wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
if (ceph_inode_is_shutdown(inode)) {
if (ci->i_wrbuffer_ref > 0) {
pr_warn_ratelimited(
"writepage_start %p %lld forced umount\n",
......@@ -1145,12 +1145,12 @@ static struct ceph_snap_context *
ceph_find_incompatible(struct page *page)
{
struct inode *inode = page->mapping->host;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
dout(" page %p forced umount\n", page);
return ERR_PTR(-EIO);
if (ceph_inode_is_shutdown(inode)) {
dout(" page %p %llx:%llx is shutdown\n", page,
ceph_vinop(inode));
return ERR_PTR(-ESTALE);
}
for (;;) {
......@@ -1345,6 +1345,9 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
sigset_t oldset;
vm_fault_t ret = VM_FAULT_SIGBUS;
if (ceph_inode_is_shutdown(inode))
return ret;
ceph_block_sigs(&oldset);
dout("filemap_fault %p %llx.%llx %llu trying to get caps\n",
......@@ -1436,6 +1439,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
sigset_t oldset;
vm_fault_t ret = VM_FAULT_SIGBUS;
if (ceph_inode_is_shutdown(inode))
return ret;
prealloc_cf = ceph_alloc_cap_flush();
if (!prealloc_cf)
return VM_FAULT_OOM;
......
......@@ -1188,11 +1188,11 @@ void ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
lockdep_assert_held(&ci->i_ceph_lock);
fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
fsc = ceph_inode_to_client(&ci->vfs_inode);
WARN_ON_ONCE(ci->i_auth_cap == cap &&
!list_empty(&ci->i_dirty_item) &&
!fsc->blocklisted &&
READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN);
!ceph_inode_is_shutdown(&ci->vfs_inode));
__ceph_remove_cap(cap, queue_release);
}
......@@ -2750,9 +2750,9 @@ static int try_get_cap_refs(struct inode *inode, int need, int want,
goto out_unlock;
}
if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
dout("get_cap_refs %p forced umount\n", inode);
ret = -EIO;
if (ceph_inode_is_shutdown(inode)) {
dout("get_cap_refs %p inode is shutdown\n", inode);
ret = -ESTALE;
goto out_unlock;
}
mds_wanted = __ceph_caps_mds_wanted(ci, false);
......@@ -4604,7 +4604,7 @@ int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invali
if (is_auth) {
struct ceph_cap_flush *cf;
if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
if (ceph_inode_is_shutdown(inode)) {
if (inode->i_data.nrpages > 0)
*invalidate = true;
if (ci->i_wrbuffer_ref > 0)
......
......@@ -157,6 +157,11 @@ static struct inode *__lookup_inode(struct super_block *sb, u64 ino)
ceph_mdsc_put_request(req);
if (!inode)
return err < 0 ? ERR_PTR(err) : ERR_PTR(-ESTALE);
} else {
if (ceph_inode_is_shutdown(inode)) {
iput(inode);
return ERR_PTR(-ESTALE);
}
}
return inode;
}
......@@ -223,8 +228,13 @@ static struct dentry *__snapfh_to_dentry(struct super_block *sb,
return ERR_PTR(-ESTALE);
inode = ceph_find_inode(sb, vino);
if (inode)
if (inode) {
if (ceph_inode_is_shutdown(inode)) {
iput(inode);
return ERR_PTR(-ESTALE);
}
return d_obtain_alias(inode);
}
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
USE_ANY_MDS);
......
......@@ -525,6 +525,7 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
if (result) {
struct dentry *dentry = req->r_dentry;
struct inode *inode = d_inode(dentry);
int pathlen = 0;
u64 base = 0;
char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
......@@ -534,7 +535,8 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
if (!d_unhashed(dentry))
d_drop(dentry);
/* FIXME: start returning I/O errors on all accesses? */
ceph_inode_shutdown(inode);
pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
base, IS_ERR(path) ? "<<bad>>" : path, result);
ceph_mdsc_free_path(path, pathlen);
......@@ -1526,6 +1528,9 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
if (direct_lock)
ceph_start_io_direct(inode);
else
......@@ -1678,6 +1683,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
loff_t pos;
loff_t limit = max(i_size_read(inode), fsc->max_file_size);
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS;
......
......@@ -1841,13 +1841,12 @@ void ceph_queue_inode_work(struct inode *inode, int work_bit)
static void ceph_do_invalidate_pages(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
u32 orig_gen;
int check = 0;
mutex_lock(&ci->i_truncate_mutex);
if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
if (ceph_inode_is_shutdown(inode)) {
pr_warn_ratelimited("%s: inode %llx.%llx is shut down\n",
__func__, ceph_vinop(inode));
mapping_set_error(inode->i_mapping, -EIO);
......@@ -2218,6 +2217,9 @@ int ceph_setattr(struct user_namespace *mnt_userns, struct dentry *dentry,
if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS;
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
err = setattr_prepare(&init_user_ns, dentry, attr);
if (err != 0)
return err;
......@@ -2348,6 +2350,9 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path,
u32 valid_mask = STATX_BASIC_STATS;
int err = 0;
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
/* Skip the getattr altogether if we're asked not to sync */
if (!(flags & AT_STATX_DONT_SYNC)) {
err = ceph_do_getattr(inode,
......@@ -2395,3 +2400,27 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path,
stat->result_mask = request_mask & valid_mask;
return err;
}
void ceph_inode_shutdown(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct rb_node *p;
int iputs = 0;
bool invalidate = false;
spin_lock(&ci->i_ceph_lock);
ci->i_ceph_flags |= CEPH_I_SHUTDOWN;
p = rb_first(&ci->i_caps);
while (p) {
struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
p = rb_next(p);
iputs += ceph_purge_inode_cap(inode, cap, &invalidate);
}
spin_unlock(&ci->i_ceph_lock);
if (invalidate)
ceph_queue_invalidate(inode);
while (iputs--)
iput(inode);
}
......@@ -241,6 +241,9 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
if (!(fl->fl_flags & FL_POSIX))
return -ENOLCK;
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
dout("ceph_lock, fl_owner: %p\n", fl->fl_owner);
/* set wait bit as appropriate, then make command as Ceph expects it*/
......@@ -306,6 +309,9 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
if (fl->fl_type & LOCK_MAND)
return -EOPNOTSUPP;
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
dout("ceph_flock, fl_file: %p\n", fl->fl_file);
spin_lock(&ci->i_ceph_lock);
......
......@@ -581,6 +581,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_ODIRECT (1 << 11) /* inode in direct I/O mode */
#define CEPH_ASYNC_CREATE_BIT (12) /* async create in flight for this */
#define CEPH_I_ASYNC_CREATE (1 << CEPH_ASYNC_CREATE_BIT)
#define CEPH_I_SHUTDOWN (1 << 13) /* inode is no longer usable */
/*
* Masks of ceph inode work.
......@@ -1028,6 +1029,16 @@ extern int ceph_setattr(struct user_namespace *mnt_userns,
extern int ceph_getattr(struct user_namespace *mnt_userns,
const struct path *path, struct kstat *stat,
u32 request_mask, unsigned int flags);
void ceph_inode_shutdown(struct inode *inode);
static inline bool ceph_inode_is_shutdown(struct inode *inode)
{
unsigned long flags = READ_ONCE(ceph_inode(inode)->i_ceph_flags);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
int state = READ_ONCE(fsc->mount_state);
return (flags & CEPH_I_SHUTDOWN) || state >= CEPH_MOUNT_SHUTDOWN;
}
/* xattr.c */
int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment