Commit 4593f3c2 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-6.5-rc5' of https://github.com/ceph/ceph-client

Pull ceph fixes from Ilya Dryomov:
 "Two patches to improve RBD exclusive lock interaction with
  osd_request_timeout option and another fix to reduce the potential for
  erroneous blocklisting -- this time in CephFS. All going to stable"

* tag 'ceph-for-6.5-rc5' of https://github.com/ceph/ceph-client:
  libceph: fix potential hang in ceph_osdc_notify()
  rbd: prevent busy loop when requesting exclusive lock
  ceph: defer stopping mdsc delayed_work
parents 79796425 e6e28432
...@@ -3675,7 +3675,7 @@ static int rbd_lock(struct rbd_device *rbd_dev) ...@@ -3675,7 +3675,7 @@ static int rbd_lock(struct rbd_device *rbd_dev)
ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
RBD_LOCK_TAG, "", 0); RBD_LOCK_TAG, "", 0);
if (ret) if (ret && ret != -EEXIST)
return ret; return ret;
__rbd_lock(rbd_dev, cookie); __rbd_lock(rbd_dev, cookie);
...@@ -3878,7 +3878,7 @@ static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev) ...@@ -3878,7 +3878,7 @@ static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev)
&rbd_dev->header_oloc, RBD_LOCK_NAME, &rbd_dev->header_oloc, RBD_LOCK_NAME,
&lock_type, &lock_tag, &lockers, &num_lockers); &lock_type, &lock_tag, &lockers, &num_lockers);
if (ret) { if (ret) {
rbd_warn(rbd_dev, "failed to retrieve lockers: %d", ret); rbd_warn(rbd_dev, "failed to get header lockers: %d", ret);
return ERR_PTR(ret); return ERR_PTR(ret);
} }
...@@ -3940,8 +3940,10 @@ static int find_watcher(struct rbd_device *rbd_dev, ...@@ -3940,8 +3940,10 @@ static int find_watcher(struct rbd_device *rbd_dev,
ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, &watchers, &rbd_dev->header_oloc, &watchers,
&num_watchers); &num_watchers);
if (ret) if (ret) {
rbd_warn(rbd_dev, "failed to get watchers: %d", ret);
return ret; return ret;
}
sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
for (i = 0; i < num_watchers; i++) { for (i = 0; i < num_watchers; i++) {
...@@ -3985,8 +3987,12 @@ static int rbd_try_lock(struct rbd_device *rbd_dev) ...@@ -3985,8 +3987,12 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
locker = refreshed_locker = NULL; locker = refreshed_locker = NULL;
ret = rbd_lock(rbd_dev); ret = rbd_lock(rbd_dev);
if (ret != -EBUSY) if (!ret)
goto out;
if (ret != -EBUSY) {
rbd_warn(rbd_dev, "failed to lock header: %d", ret);
goto out; goto out;
}
/* determine if the current lock holder is still alive */ /* determine if the current lock holder is still alive */
locker = get_lock_owner_info(rbd_dev); locker = get_lock_owner_info(rbd_dev);
...@@ -4089,11 +4095,8 @@ static int rbd_try_acquire_lock(struct rbd_device *rbd_dev) ...@@ -4089,11 +4095,8 @@ static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
ret = rbd_try_lock(rbd_dev); ret = rbd_try_lock(rbd_dev);
if (ret < 0) { if (ret < 0) {
rbd_warn(rbd_dev, "failed to lock header: %d", ret); rbd_warn(rbd_dev, "failed to acquire lock: %d", ret);
if (ret == -EBLOCKLISTED)
goto out; goto out;
ret = 1; /* request lock anyway */
} }
if (ret > 0) { if (ret > 0) {
up_write(&rbd_dev->lock_rwsem); up_write(&rbd_dev->lock_rwsem);
...@@ -6627,12 +6630,11 @@ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) ...@@ -6627,12 +6630,11 @@ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
cancel_delayed_work_sync(&rbd_dev->lock_dwork); cancel_delayed_work_sync(&rbd_dev->lock_dwork);
if (!ret) if (!ret)
ret = -ETIMEDOUT; ret = -ETIMEDOUT;
}
if (ret) { rbd_warn(rbd_dev, "failed to acquire lock: %ld", ret);
rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
return ret;
} }
if (ret)
return ret;
/* /*
* The lock may have been released by now, unless automatic lock * The lock may have been released by now, unless automatic lock
......
...@@ -4764,7 +4764,7 @@ static void delayed_work(struct work_struct *work) ...@@ -4764,7 +4764,7 @@ static void delayed_work(struct work_struct *work)
dout("mdsc delayed_work\n"); dout("mdsc delayed_work\n");
if (mdsc->stopping) if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
return; return;
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
...@@ -4943,7 +4943,7 @@ void send_flush_mdlog(struct ceph_mds_session *s) ...@@ -4943,7 +4943,7 @@ void send_flush_mdlog(struct ceph_mds_session *s)
void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
{ {
dout("pre_umount\n"); dout("pre_umount\n");
mdsc->stopping = 1; mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
......
...@@ -380,6 +380,11 @@ struct cap_wait { ...@@ -380,6 +380,11 @@ struct cap_wait {
int want; int want;
}; };
enum {
CEPH_MDSC_STOPPING_BEGIN = 1,
CEPH_MDSC_STOPPING_FLUSHED = 2,
};
/* /*
* mds client state * mds client state
*/ */
......
...@@ -1374,6 +1374,16 @@ static void ceph_kill_sb(struct super_block *s) ...@@ -1374,6 +1374,16 @@ static void ceph_kill_sb(struct super_block *s)
ceph_mdsc_pre_umount(fsc->mdsc); ceph_mdsc_pre_umount(fsc->mdsc);
flush_fs_workqueues(fsc); flush_fs_workqueues(fsc);
/*
* Though the kill_anon_super() will finally trigger the
* sync_filesystem() anyway, we still need to do it here
* and then bump the stage of shutdown to stop the work
* queue as earlier as possible.
*/
sync_filesystem(s);
fsc->mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHED;
kill_anon_super(s); kill_anon_super(s);
fsc->client->extra_mon_dispatch = NULL; fsc->client->extra_mon_dispatch = NULL;
......
...@@ -3334,17 +3334,24 @@ static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) ...@@ -3334,17 +3334,24 @@ static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
int ret; int ret;
dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
ret = wait_for_completion_interruptible(&lreq->reg_commit_wait); ret = wait_for_completion_killable(&lreq->reg_commit_wait);
return ret ?: lreq->reg_commit_error; return ret ?: lreq->reg_commit_error;
} }
static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq) static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq,
unsigned long timeout)
{ {
int ret; long left;
dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
ret = wait_for_completion_interruptible(&lreq->notify_finish_wait); left = wait_for_completion_killable_timeout(&lreq->notify_finish_wait,
return ret ?: lreq->notify_finish_error; ceph_timeout_jiffies(timeout));
if (left <= 0)
left = left ?: -ETIMEDOUT;
else
left = lreq->notify_finish_error; /* completed */
return left;
} }
/* /*
...@@ -4896,7 +4903,8 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc, ...@@ -4896,7 +4903,8 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
linger_submit(lreq); linger_submit(lreq);
ret = linger_reg_commit_wait(lreq); ret = linger_reg_commit_wait(lreq);
if (!ret) if (!ret)
ret = linger_notify_finish_wait(lreq); ret = linger_notify_finish_wait(lreq,
msecs_to_jiffies(2 * timeout * MSEC_PER_SEC));
else else
dout("lreq %p failed to initiate notify %d\n", lreq, ret); dout("lreq %p failed to initiate notify %d\n", lreq, ret);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment