Commit 9d01e07f authored by Ilya Dryomov's avatar Ilya Dryomov

rbd: prevent busy loop when requesting exclusive lock

Due to rbd_try_acquire_lock() effectively swallowing all but
EBLOCKLISTED error from rbd_try_lock() ("request lock anyway") and
rbd_request_lock() returning ETIMEDOUT error not only for an actual
notify timeout but also when the lock owner doesn't respond, a busy
loop inside of rbd_acquire_lock() between rbd_try_acquire_lock() and
rbd_request_lock() is possible.

Requesting the lock on EBUSY error (returned by get_lock_owner_info()
if an incompatible lock or invalid lock owner is detected) makes very
little sense.  The same goes for ETIMEDOUT error (might pop up pretty
much anywhere if osd_request_timeout option is set) and many others.

Just fail I/O requests on rbd_dev->acquiring_list immediately on any
error from rbd_try_lock().

Cc: stable@vger.kernel.org # 58815900: rbd: retrieve and check lock owner twice before blocklisting
Cc: stable@vger.kernel.org
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
Reviewed-by: default avatarDongsheng Yang <dongsheng.yang@easystack.cn>
parent e7e607bd
...@@ -3675,7 +3675,7 @@ static int rbd_lock(struct rbd_device *rbd_dev) ...@@ -3675,7 +3675,7 @@ static int rbd_lock(struct rbd_device *rbd_dev)
ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
RBD_LOCK_TAG, "", 0); RBD_LOCK_TAG, "", 0);
if (ret) if (ret && ret != -EEXIST)
return ret; return ret;
__rbd_lock(rbd_dev, cookie); __rbd_lock(rbd_dev, cookie);
...@@ -3878,7 +3878,7 @@ static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev) ...@@ -3878,7 +3878,7 @@ static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev)
&rbd_dev->header_oloc, RBD_LOCK_NAME, &rbd_dev->header_oloc, RBD_LOCK_NAME,
&lock_type, &lock_tag, &lockers, &num_lockers); &lock_type, &lock_tag, &lockers, &num_lockers);
if (ret) { if (ret) {
rbd_warn(rbd_dev, "failed to retrieve lockers: %d", ret); rbd_warn(rbd_dev, "failed to get header lockers: %d", ret);
return ERR_PTR(ret); return ERR_PTR(ret);
} }
...@@ -3940,8 +3940,10 @@ static int find_watcher(struct rbd_device *rbd_dev, ...@@ -3940,8 +3940,10 @@ static int find_watcher(struct rbd_device *rbd_dev,
ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
&rbd_dev->header_oloc, &watchers, &rbd_dev->header_oloc, &watchers,
&num_watchers); &num_watchers);
if (ret) if (ret) {
rbd_warn(rbd_dev, "failed to get watchers: %d", ret);
return ret; return ret;
}
sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
for (i = 0; i < num_watchers; i++) { for (i = 0; i < num_watchers; i++) {
...@@ -3985,8 +3987,12 @@ static int rbd_try_lock(struct rbd_device *rbd_dev) ...@@ -3985,8 +3987,12 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
locker = refreshed_locker = NULL; locker = refreshed_locker = NULL;
ret = rbd_lock(rbd_dev); ret = rbd_lock(rbd_dev);
if (ret != -EBUSY) if (!ret)
goto out;
if (ret != -EBUSY) {
rbd_warn(rbd_dev, "failed to lock header: %d", ret);
goto out; goto out;
}
/* determine if the current lock holder is still alive */ /* determine if the current lock holder is still alive */
locker = get_lock_owner_info(rbd_dev); locker = get_lock_owner_info(rbd_dev);
...@@ -4089,11 +4095,8 @@ static int rbd_try_acquire_lock(struct rbd_device *rbd_dev) ...@@ -4089,11 +4095,8 @@ static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
ret = rbd_try_lock(rbd_dev); ret = rbd_try_lock(rbd_dev);
if (ret < 0) { if (ret < 0) {
rbd_warn(rbd_dev, "failed to lock header: %d", ret); rbd_warn(rbd_dev, "failed to acquire lock: %d", ret);
if (ret == -EBLOCKLISTED)
goto out; goto out;
ret = 1; /* request lock anyway */
} }
if (ret > 0) { if (ret > 0) {
up_write(&rbd_dev->lock_rwsem); up_write(&rbd_dev->lock_rwsem);
...@@ -6627,12 +6630,11 @@ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) ...@@ -6627,12 +6630,11 @@ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
cancel_delayed_work_sync(&rbd_dev->lock_dwork); cancel_delayed_work_sync(&rbd_dev->lock_dwork);
if (!ret) if (!ret)
ret = -ETIMEDOUT; ret = -ETIMEDOUT;
}
if (ret) { rbd_warn(rbd_dev, "failed to acquire lock: %ld", ret);
rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
return ret;
} }
if (ret)
return ret;
/* /*
* The lock may have been released by now, unless automatic lock * The lock may have been released by now, unless automatic lock
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment