Commit bdcff415 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-4.9-rc2' of git://github.com/ceph/ceph-client

Pull Ceph fixes from Ilya Dryomov:
 "An rbd exclusive-lock edge case fix and several filesystem fixups.

  Nikolay's error path patch is tagged for stable, everything else but
  readdir vs frags race was introduced in this merge window"

* tag 'ceph-for-4.9-rc2' of git://github.com/ceph/ceph-client:
  ceph: fix non static symbol warning
  ceph: fix uninitialized dentry pointer in ceph_real_mount()
  ceph: fix readdir vs fragmentation race
  ceph: fix error handling in ceph_read_iter
  rbd: don't retry watch reregistration if header object is gone
  rbd: don't wait for the lock forever if blacklisted
parents 0ea67fae 5130ccea
......@@ -415,15 +415,15 @@ struct rbd_device {
};
/*
* Flag bits for rbd_dev->flags. If atomicity is required,
* rbd_dev->lock is used to protect access.
*
* Currently, only the "removing" flag (which is coupled with the
* "open_count" field) requires atomic access.
* Flag bits for rbd_dev->flags:
* - REMOVING (which is coupled with rbd_dev->open_count) is protected
* by rbd_dev->lock
* - BLACKLISTED is protected by rbd_dev->lock_rwsem
*/
enum rbd_dev_flags {
RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
};
static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
......@@ -3926,6 +3926,7 @@ static void rbd_reregister_watch(struct work_struct *work)
struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
struct rbd_device, watch_dwork);
bool was_lock_owner = false;
bool need_to_wake = false;
int ret;
dout("%s rbd_dev %p\n", __func__, rbd_dev);
......@@ -3935,19 +3936,27 @@ static void rbd_reregister_watch(struct work_struct *work)
was_lock_owner = rbd_release_lock(rbd_dev);
mutex_lock(&rbd_dev->watch_mutex);
if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
goto fail_unlock;
if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
mutex_unlock(&rbd_dev->watch_mutex);
goto out;
}
ret = __rbd_register_watch(rbd_dev);
if (ret) {
rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
if (ret != -EBLACKLISTED)
if (ret == -EBLACKLISTED || ret == -ENOENT) {
set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
need_to_wake = true;
} else {
queue_delayed_work(rbd_dev->task_wq,
&rbd_dev->watch_dwork,
RBD_RETRY_DELAY);
goto fail_unlock;
}
mutex_unlock(&rbd_dev->watch_mutex);
goto out;
}
need_to_wake = true;
rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
mutex_unlock(&rbd_dev->watch_mutex);
......@@ -3963,13 +3972,10 @@ static void rbd_reregister_watch(struct work_struct *work)
ret);
}
out:
up_write(&rbd_dev->lock_rwsem);
if (need_to_wake)
wake_requests(rbd_dev, true);
return;
fail_unlock:
mutex_unlock(&rbd_dev->watch_mutex);
up_write(&rbd_dev->lock_rwsem);
}
/*
......@@ -4074,7 +4080,9 @@ static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
up_read(&rbd_dev->lock_rwsem);
schedule();
down_read(&rbd_dev->lock_rwsem);
} while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
} while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
!test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
finish_wait(&rbd_dev->lock_waitq, &wait);
}
......@@ -4166,8 +4174,16 @@ static void rbd_queue_workfn(struct work_struct *work)
if (must_be_locked) {
down_read(&rbd_dev->lock_rwsem);
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
!test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
rbd_wait_state_locked(rbd_dev);
WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
!test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
result = -EBLACKLISTED;
goto err_unlock;
}
}
img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
......
......@@ -1272,6 +1272,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
statret = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, !!page);
if (statret < 0) {
if (page)
__free_page(page);
if (statret == -ENODATA) {
BUG_ON(retry_op != READ_INLINE);
......
......@@ -1511,7 +1511,8 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir);
}
if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2 &&
!(rinfo->hash_order && req->r_path2)) {
/* note dir version at start of readdir so we can tell
* if any dentries get dropped */
req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
......
......@@ -845,6 +845,8 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
err = ceph_fs_debugfs_init(fsc);
if (err < 0)
goto fail;
} else {
root = dget(fsc->sb->s_root);
}
fsc->mount_state = CEPH_MOUNT_MOUNTED;
......
......@@ -16,7 +16,7 @@
static int __remove_xattr(struct ceph_inode_info *ci,
struct ceph_inode_xattr *xattr);
const struct xattr_handler ceph_other_xattr_handler;
static const struct xattr_handler ceph_other_xattr_handler;
/*
* List of handlers for synthetic system.* attributes. Other
......@@ -1086,7 +1086,7 @@ static int ceph_set_xattr_handler(const struct xattr_handler *handler,
return __ceph_setxattr(inode, name, value, size, flags);
}
const struct xattr_handler ceph_other_xattr_handler = {
static const struct xattr_handler ceph_other_xattr_handler = {
.prefix = "", /* match any name => handlers called with full name */
.get = ceph_get_xattr_handler,
.set = ceph_set_xattr_handler,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment