Commit e010dd0a authored by Ilya Dryomov's avatar Ilya Dryomov

rbd: exclusive map option

Support disabling automatic exclusive lock transfers to allow users
to be in charge of which node should own the lock while being able to
reuse exclusive lock's built-in blacklist/break-lock functionality.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
Reviewed-by: default avatarJason Dillaman <dillaman@redhat.com>
parent 3b77faa0
......@@ -798,6 +798,7 @@ enum {
Opt_read_only,
Opt_read_write,
Opt_lock_on_read,
Opt_exclusive,
Opt_err
};
......@@ -810,6 +811,7 @@ static match_table_t rbd_opts_tokens = {
{Opt_read_write, "read_write"},
{Opt_read_write, "rw"}, /* Alternate spelling */
{Opt_lock_on_read, "lock_on_read"},
{Opt_exclusive, "exclusive"},
{Opt_err, NULL}
};
......@@ -817,11 +819,13 @@ struct rbd_options {
int queue_depth;
bool read_only;
bool lock_on_read;
bool exclusive;
};
#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
#define RBD_READ_ONLY_DEFAULT false
#define RBD_LOCK_ON_READ_DEFAULT false
#define RBD_EXCLUSIVE_DEFAULT false
static int parse_rbd_opts_token(char *c, void *private)
{
......@@ -860,6 +864,9 @@ static int parse_rbd_opts_token(char *c, void *private)
case Opt_lock_on_read:
rbd_opts->lock_on_read = true;
break;
case Opt_exclusive:
rbd_opts->exclusive = true;
break;
default:
/* libceph prints "bad option" msg */
return -EINVAL;
......@@ -3440,6 +3447,18 @@ static void rbd_acquire_lock(struct work_struct *work)
ret = rbd_request_lock(rbd_dev);
if (ret == -ETIMEDOUT) {
goto again; /* treat this as a dead client */
} else if (ret == -EROFS) {
rbd_warn(rbd_dev, "peer will not release lock");
/*
* If this is rbd_add_acquire_lock(), we want to fail
* immediately -- reuse BLACKLISTED flag. Otherwise we
* want to block.
*/
if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
/* wake "rbd map --exclusive" process */
wake_requests(rbd_dev, false);
}
} else if (ret < 0) {
rbd_warn(rbd_dev, "error requesting lock: %d", ret);
mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
......@@ -3606,9 +3625,15 @@ static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
result = 0;
if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
dout("%s rbd_dev %p queueing unlock_work\n", __func__,
rbd_dev);
queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
if (!rbd_dev->opts->exclusive) {
dout("%s rbd_dev %p queueing unlock_work\n",
__func__, rbd_dev);
queue_work(rbd_dev->task_wq,
&rbd_dev->unlock_work);
} else {
/* refuse to release the lock */
result = -EROFS;
}
}
}
......@@ -4073,8 +4098,14 @@ static void rbd_queue_workfn(struct work_struct *work)
if (must_be_locked) {
down_read(&rbd_dev->lock_rwsem);
if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
!test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
!test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
if (rbd_dev->opts->exclusive) {
rbd_warn(rbd_dev, "exclusive lock required");
result = -EROFS;
goto err_unlock;
}
rbd_wait_state_locked(rbd_dev);
}
if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
result = -EBLACKLISTED;
goto err_unlock;
......@@ -5640,6 +5671,7 @@ static int rbd_add_parse_args(const char *buf,
rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
copts = ceph_parse_options(options, mon_addrs,
mon_addrs + mon_addrs_size - 1,
......@@ -5698,6 +5730,33 @@ static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
return ret;
}
static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
{
down_write(&rbd_dev->lock_rwsem);
if (__rbd_is_lock_owner(rbd_dev))
rbd_unlock(rbd_dev);
up_write(&rbd_dev->lock_rwsem);
}
static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
{
if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
return -EINVAL;
}
/* FIXME: "rbd map --exclusive" should be in interruptible */
down_read(&rbd_dev->lock_rwsem);
rbd_wait_state_locked(rbd_dev);
up_read(&rbd_dev->lock_rwsem);
if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
rbd_warn(rbd_dev, "failed to acquire exclusive lock");
return -EROFS;
}
return 0;
}
/*
* An rbd format 2 image has a unique identifier, distinct from the
* name given to it by the user. Internally, that identifier is
......@@ -6141,11 +6200,17 @@ static ssize_t do_rbd_add(struct bus_type *bus,
if (rc)
goto err_out_image_probe;
if (rbd_dev->opts->exclusive) {
rc = rbd_add_acquire_lock(rbd_dev);
if (rc)
goto err_out_device_setup;
}
/* Everything's ready. Announce the disk to the world. */
rc = device_add(&rbd_dev->dev);
if (rc)
goto err_out_device_setup;
goto err_out_image_lock;
add_disk(rbd_dev->disk);
/* see rbd_init_disk() */
......@@ -6163,6 +6228,8 @@ static ssize_t do_rbd_add(struct bus_type *bus,
module_put(THIS_MODULE);
return rc;
err_out_image_lock:
rbd_dev_image_unlock(rbd_dev);
err_out_device_setup:
rbd_dev_device_release(rbd_dev);
err_out_image_probe:
......@@ -6286,11 +6353,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
spin_unlock(&rbd_dev_list_lock);
device_del(&rbd_dev->dev);
down_write(&rbd_dev->lock_rwsem);
if (__rbd_is_lock_owner(rbd_dev))
rbd_unlock(rbd_dev);
up_write(&rbd_dev->lock_rwsem);
rbd_dev_image_unlock(rbd_dev);
rbd_dev_device_release(rbd_dev);
rbd_dev_image_release(rbd_dev);
rbd_dev_destroy(rbd_dev);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment