Commit 59c2be1e authored by Yehuda Sadeh's avatar Yehuda Sadeh Committed by Sage Weil

rbd: use watch/notify for changes in rbd header

Send notifications when we change the rbd header (e.g. create a snapshot)
and wait for such notifications.  This allows synchronizing the snapshot
creation between different rbd clients/rools.
Signed-off-by: default avatarYehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent a40c4f10
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include <linux/ceph/osd_client.h> #include <linux/ceph/osd_client.h>
#include <linux/ceph/mon_client.h> #include <linux/ceph/mon_client.h>
#include <linux/ceph/decode.h> #include <linux/ceph/decode.h>
#include <linux/parser.h>
#include <linux/kernel.h> #include <linux/kernel.h>
#include <linux/device.h> #include <linux/device.h>
...@@ -54,6 +55,8 @@ ...@@ -54,6 +55,8 @@
#define DEV_NAME_LEN 32 #define DEV_NAME_LEN 32
#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
/* /*
* block device image metadata (in-memory version) * block device image metadata (in-memory version)
*/ */
...@@ -71,6 +74,12 @@ struct rbd_image_header { ...@@ -71,6 +74,12 @@ struct rbd_image_header {
char *snap_names; char *snap_names;
u64 *snap_sizes; u64 *snap_sizes;
u64 obj_version;
};
struct rbd_options {
int notify_timeout;
}; };
/* /*
...@@ -78,6 +87,7 @@ struct rbd_image_header { ...@@ -78,6 +87,7 @@ struct rbd_image_header {
*/ */
struct rbd_client { struct rbd_client {
struct ceph_client *client; struct ceph_client *client;
struct rbd_options *rbd_opts;
struct kref kref; struct kref kref;
struct list_head node; struct list_head node;
}; };
...@@ -124,6 +134,9 @@ struct rbd_device { ...@@ -124,6 +134,9 @@ struct rbd_device {
char pool_name[RBD_MAX_POOL_NAME_LEN]; char pool_name[RBD_MAX_POOL_NAME_LEN];
int poolid; int poolid;
struct ceph_osd_event *watch_event;
struct ceph_osd_request *watch_request;
char snap_name[RBD_MAX_SNAP_NAME_LEN]; char snap_name[RBD_MAX_SNAP_NAME_LEN];
u32 cur_snap; /* index+1 of current snapshot within snap context u32 cur_snap; /* index+1 of current snapshot within snap context
0 - for the head */ 0 - for the head */
...@@ -177,6 +190,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev) ...@@ -177,6 +190,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
put_device(&rbd_dev->dev); put_device(&rbd_dev->dev);
} }
static int __rbd_update_snaps(struct rbd_device *rbd_dev);
static int rbd_open(struct block_device *bdev, fmode_t mode) static int rbd_open(struct block_device *bdev, fmode_t mode)
{ {
struct gendisk *disk = bdev->bd_disk; struct gendisk *disk = bdev->bd_disk;
...@@ -211,7 +226,8 @@ static const struct block_device_operations rbd_bd_ops = { ...@@ -211,7 +226,8 @@ static const struct block_device_operations rbd_bd_ops = {
* Initialize an rbd client instance. * Initialize an rbd client instance.
* We own *opt. * We own *opt.
*/ */
static struct rbd_client *rbd_client_create(struct ceph_options *opt) static struct rbd_client *rbd_client_create(struct ceph_options *opt,
struct rbd_options *rbd_opts)
{ {
struct rbd_client *rbdc; struct rbd_client *rbdc;
int ret = -ENOMEM; int ret = -ENOMEM;
...@@ -233,6 +249,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt) ...@@ -233,6 +249,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt)
if (ret < 0) if (ret < 0)
goto out_err; goto out_err;
rbdc->rbd_opts = rbd_opts;
spin_lock(&node_lock); spin_lock(&node_lock);
list_add_tail(&rbdc->node, &rbd_client_list); list_add_tail(&rbdc->node, &rbd_client_list);
spin_unlock(&node_lock); spin_unlock(&node_lock);
...@@ -266,6 +284,59 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt) ...@@ -266,6 +284,59 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
return NULL; return NULL;
} }
/*
* mount options
*/
enum {
Opt_notify_timeout,
Opt_last_int,
/* int args above */
Opt_last_string,
/* string args above */
};
static match_table_t rbdopt_tokens = {
{Opt_notify_timeout, "notify_timeout=%d"},
/* int args above */
/* string args above */
{-1, NULL}
};
static int parse_rbd_opts_token(char *c, void *private)
{
struct rbd_options *rbdopt = private;
substring_t argstr[MAX_OPT_ARGS];
int token, intval, ret;
token = match_token((char *)c, rbdopt_tokens, argstr);
if (token < 0)
return -EINVAL;
if (token < Opt_last_int) {
ret = match_int(&argstr[0], &intval);
if (ret < 0) {
pr_err("bad mount option arg (not int) "
"at '%s'\n", c);
return ret;
}
dout("got int token %d val %d\n", token, intval);
} else if (token > Opt_last_int && token < Opt_last_string) {
dout("got string token %d val %s\n", token,
argstr[0].from);
} else {
dout("got token %d\n", token);
}
switch (token) {
case Opt_notify_timeout:
rbdopt->notify_timeout = intval;
break;
default:
BUG_ON(token);
}
return 0;
}
/* /*
* Get a ceph client with specific addr and configuration, if one does * Get a ceph client with specific addr and configuration, if one does
* not exist create it. * not exist create it.
...@@ -276,11 +347,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, ...@@ -276,11 +347,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
struct rbd_client *rbdc; struct rbd_client *rbdc;
struct ceph_options *opt; struct ceph_options *opt;
int ret; int ret;
struct rbd_options *rbd_opts;
rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
if (!rbd_opts)
return -ENOMEM;
rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
ret = ceph_parse_options(&opt, options, mon_addr, ret = ceph_parse_options(&opt, options, mon_addr,
mon_addr + strlen(mon_addr), NULL, NULL); mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
if (ret < 0) if (ret < 0)
return ret; goto done_err;
spin_lock(&node_lock); spin_lock(&node_lock);
rbdc = __rbd_client_find(opt); rbdc = __rbd_client_find(opt);
...@@ -296,13 +374,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, ...@@ -296,13 +374,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
} }
spin_unlock(&node_lock); spin_unlock(&node_lock);
rbdc = rbd_client_create(opt); rbdc = rbd_client_create(opt, rbd_opts);
if (IS_ERR(rbdc)) if (IS_ERR(rbdc)) {
return PTR_ERR(rbdc); ret = PTR_ERR(rbdc);
goto done_err;
}
rbd_dev->rbd_client = rbdc; rbd_dev->rbd_client = rbdc;
rbd_dev->client = rbdc->client; rbd_dev->client = rbdc->client;
return 0; return 0;
done_err:
kfree(rbd_opts);
return ret;
} }
/* /*
...@@ -318,6 +401,7 @@ static void rbd_client_release(struct kref *kref) ...@@ -318,6 +401,7 @@ static void rbd_client_release(struct kref *kref)
spin_unlock(&node_lock); spin_unlock(&node_lock);
ceph_destroy_client(rbdc->client); ceph_destroy_client(rbdc->client);
kfree(rbdc->rbd_opts);
kfree(rbdc); kfree(rbdc);
} }
...@@ -666,7 +750,9 @@ static int rbd_do_request(struct request *rq, ...@@ -666,7 +750,9 @@ static int rbd_do_request(struct request *rq,
struct ceph_osd_req_op *ops, struct ceph_osd_req_op *ops,
int num_reply, int num_reply,
void (*rbd_cb)(struct ceph_osd_request *req, void (*rbd_cb)(struct ceph_osd_request *req,
struct ceph_msg *msg)) struct ceph_msg *msg),
struct ceph_osd_request **linger_req,
u64 *ver)
{ {
struct ceph_osd_request *req; struct ceph_osd_request *req;
struct ceph_file_layout *layout; struct ceph_file_layout *layout;
...@@ -729,12 +815,20 @@ static int rbd_do_request(struct request *rq, ...@@ -729,12 +815,20 @@ static int rbd_do_request(struct request *rq,
req->r_oid, req->r_oid_len); req->r_oid, req->r_oid_len);
up_read(&header->snap_rwsem); up_read(&header->snap_rwsem);
if (linger_req) {
ceph_osdc_set_request_linger(&dev->client->osdc, req);
*linger_req = req;
}
ret = ceph_osdc_start_request(&dev->client->osdc, req, false); ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
if (ret < 0) if (ret < 0)
goto done_err; goto done_err;
if (!rbd_cb) { if (!rbd_cb) {
ret = ceph_osdc_wait_request(&dev->client->osdc, req); ret = ceph_osdc_wait_request(&dev->client->osdc, req);
if (ver)
*ver = le64_to_cpu(req->r_reassert_version.version);
dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
} }
return ret; return ret;
...@@ -789,6 +883,11 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) ...@@ -789,6 +883,11 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
kfree(req_data); kfree(req_data);
} }
static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
{
ceph_osdc_put_request(req);
}
/* /*
* Do a synchronous ceph osd operation * Do a synchronous ceph osd operation
*/ */
...@@ -801,7 +900,9 @@ static int rbd_req_sync_op(struct rbd_device *dev, ...@@ -801,7 +900,9 @@ static int rbd_req_sync_op(struct rbd_device *dev,
int num_reply, int num_reply,
const char *obj, const char *obj,
u64 ofs, u64 len, u64 ofs, u64 len,
char *buf) char *buf,
struct ceph_osd_request **linger_req,
u64 *ver)
{ {
int ret; int ret;
struct page **pages; struct page **pages;
...@@ -833,7 +934,8 @@ static int rbd_req_sync_op(struct rbd_device *dev, ...@@ -833,7 +934,8 @@ static int rbd_req_sync_op(struct rbd_device *dev,
flags, flags,
ops, ops,
2, 2,
NULL); NULL,
linger_req, ver);
if (ret < 0) if (ret < 0)
goto done_ops; goto done_ops;
...@@ -893,7 +995,7 @@ static int rbd_do_op(struct request *rq, ...@@ -893,7 +995,7 @@ static int rbd_do_op(struct request *rq,
flags, flags,
ops, ops,
num_reply, num_reply,
rbd_req_cb); rbd_req_cb, 0, NULL);
done: done:
kfree(seg_name); kfree(seg_name);
return ret; return ret;
...@@ -940,18 +1042,174 @@ static int rbd_req_sync_read(struct rbd_device *dev, ...@@ -940,18 +1042,174 @@ static int rbd_req_sync_read(struct rbd_device *dev,
u64 snapid, u64 snapid,
const char *obj, const char *obj,
u64 ofs, u64 len, u64 ofs, u64 len,
char *buf) char *buf,
u64 *ver)
{ {
return rbd_req_sync_op(dev, NULL, return rbd_req_sync_op(dev, NULL,
(snapid ? snapid : CEPH_NOSNAP), (snapid ? snapid : CEPH_NOSNAP),
CEPH_OSD_OP_READ, CEPH_OSD_OP_READ,
CEPH_OSD_FLAG_READ, CEPH_OSD_FLAG_READ,
NULL, NULL,
1, obj, ofs, len, buf); 1, obj, ofs, len, buf, NULL, ver);
} }
/* /*
* Request sync osd read * Request sync osd watch
*/
static int rbd_req_sync_notify_ack(struct rbd_device *dev,
u64 ver,
u64 notify_id,
const char *obj)
{
struct ceph_osd_req_op *ops;
struct page **pages = NULL;
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
if (ret < 0)
return ret;
ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
ops[0].watch.cookie = notify_id;
ops[0].watch.flag = 0;
ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
obj, 0, 0, NULL,
pages, 0,
CEPH_OSD_FLAG_READ,
ops,
1,
rbd_simple_req_cb, 0, NULL);
rbd_destroy_ops(ops);
return ret;
}
static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
{
struct rbd_device *dev = (struct rbd_device *)data;
if (!dev)
return;
dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
notify_id, (int)opcode);
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
__rbd_update_snaps(dev);
mutex_unlock(&ctl_mutex);
rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
}
/*
* Request sync osd watch
*/
static int rbd_req_sync_watch(struct rbd_device *dev,
const char *obj,
u64 ver)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_client *osdc = &dev->client->osdc;
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
if (ret < 0)
return ret;
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
(void *)dev, &dev->watch_event);
if (ret < 0)
goto fail;
ops[0].watch.ver = cpu_to_le64(ver);
ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
ops[0].watch.flag = 1;
ret = rbd_req_sync_op(dev, NULL,
CEPH_NOSNAP,
0,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops,
1, obj, 0, 0, NULL,
&dev->watch_request, NULL);
if (ret < 0)
goto fail_event;
rbd_destroy_ops(ops);
return 0;
fail_event:
ceph_osdc_cancel_event(dev->watch_event);
dev->watch_event = NULL;
fail:
rbd_destroy_ops(ops);
return ret;
}
struct rbd_notify_info {
struct rbd_device *dev;
};
static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
{
struct rbd_device *dev = (struct rbd_device *)data;
if (!dev)
return;
dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
notify_id, (int)opcode);
}
/*
* Request sync osd notify
*/
static int rbd_req_sync_notify(struct rbd_device *dev,
const char *obj)
{
struct ceph_osd_req_op *ops;
struct ceph_osd_client *osdc = &dev->client->osdc;
struct ceph_osd_event *event;
struct rbd_notify_info info;
int payload_len = sizeof(u32) + sizeof(u32);
int ret;
ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
if (ret < 0)
return ret;
info.dev = dev;
ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
(void *)&info, &event);
if (ret < 0)
goto fail;
ops[0].watch.ver = 1;
ops[0].watch.flag = 1;
ops[0].watch.cookie = event->cookie;
ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
ops[0].watch.timeout = 12;
ret = rbd_req_sync_op(dev, NULL,
CEPH_NOSNAP,
0,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops,
1, obj, 0, 0, NULL, NULL, NULL);
if (ret < 0)
goto fail_event;
ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
dout("ceph_osdc_wait_event returned %d\n", ret);
rbd_destroy_ops(ops);
return 0;
fail_event:
ceph_osdc_cancel_event(event);
fail:
rbd_destroy_ops(ops);
return ret;
}
/*
* Request sync osd rollback
*/ */
static int rbd_req_sync_rollback_obj(struct rbd_device *dev, static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
u64 snapid, u64 snapid,
...@@ -969,13 +1227,10 @@ static int rbd_req_sync_rollback_obj(struct rbd_device *dev, ...@@ -969,13 +1227,10 @@ static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
0, 0,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops, ops,
1, obj, 0, 0, NULL); 1, obj, 0, 0, NULL, NULL, NULL);
rbd_destroy_ops(ops); rbd_destroy_ops(ops);
if (ret < 0)
return ret;
return ret; return ret;
} }
...@@ -987,7 +1242,8 @@ static int rbd_req_sync_exec(struct rbd_device *dev, ...@@ -987,7 +1242,8 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
const char *cls, const char *cls,
const char *method, const char *method,
const char *data, const char *data,
int len) int len,
u64 *ver)
{ {
struct ceph_osd_req_op *ops; struct ceph_osd_req_op *ops;
int cls_len = strlen(cls); int cls_len = strlen(cls);
...@@ -1010,7 +1266,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev, ...@@ -1010,7 +1266,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
0, 0,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops, ops,
1, obj, 0, 0, NULL); 1, obj, 0, 0, NULL, NULL, ver);
rbd_destroy_ops(ops); rbd_destroy_ops(ops);
...@@ -1156,6 +1412,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, ...@@ -1156,6 +1412,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
struct rbd_image_header_ondisk *dh; struct rbd_image_header_ondisk *dh;
int snap_count = 0; int snap_count = 0;
u64 snap_names_len = 0; u64 snap_names_len = 0;
u64 ver;
while (1) { while (1) {
int len = sizeof(*dh) + int len = sizeof(*dh) +
...@@ -1171,7 +1428,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, ...@@ -1171,7 +1428,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
NULL, CEPH_NOSNAP, NULL, CEPH_NOSNAP,
rbd_dev->obj_md_name, rbd_dev->obj_md_name,
0, len, 0, len,
(char *)dh); (char *)dh, &ver);
if (rc < 0) if (rc < 0)
goto out_dh; goto out_dh;
...@@ -1188,6 +1445,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, ...@@ -1188,6 +1445,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
} }
break; break;
} }
header->obj_version = ver;
out_dh: out_dh:
kfree(dh); kfree(dh);
...@@ -1205,6 +1463,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, ...@@ -1205,6 +1463,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
u64 new_snapid; u64 new_snapid;
int ret; int ret;
void *data, *data_start, *data_end; void *data, *data_start, *data_end;
u64 ver;
/* we should create a snapshot only if we're pointing at the head */ /* we should create a snapshot only if we're pointing at the head */
if (dev->cur_snap) if (dev->cur_snap)
...@@ -1227,7 +1486,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, ...@@ -1227,7 +1486,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
ceph_encode_64_safe(&data, data_end, new_snapid, bad); ceph_encode_64_safe(&data, data_end, new_snapid, bad);
ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
data_start, data - data_start); data_start, data - data_start, &ver);
kfree(data_start); kfree(data_start);
...@@ -1259,6 +1518,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) ...@@ -1259,6 +1518,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
int ret; int ret;
struct rbd_image_header h; struct rbd_image_header h;
u64 snap_seq; u64 snap_seq;
int follow_seq = 0;
ret = rbd_read_header(rbd_dev, &h); ret = rbd_read_header(rbd_dev, &h);
if (ret < 0) if (ret < 0)
...@@ -1267,6 +1527,11 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) ...@@ -1267,6 +1527,11 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
down_write(&rbd_dev->header.snap_rwsem); down_write(&rbd_dev->header.snap_rwsem);
snap_seq = rbd_dev->header.snapc->seq; snap_seq = rbd_dev->header.snapc->seq;
if (rbd_dev->header.total_snaps &&
rbd_dev->header.snapc->snaps[0] == snap_seq)
/* pointing at the head, will need to follow that
if head moves */
follow_seq = 1;
kfree(rbd_dev->header.snapc); kfree(rbd_dev->header.snapc);
kfree(rbd_dev->header.snap_names); kfree(rbd_dev->header.snap_names);
...@@ -1277,6 +1542,9 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) ...@@ -1277,6 +1542,9 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
rbd_dev->header.snap_names = h.snap_names; rbd_dev->header.snap_names = h.snap_names;
rbd_dev->header.snap_names_len = h.snap_names_len; rbd_dev->header.snap_names_len = h.snap_names_len;
rbd_dev->header.snap_sizes = h.snap_sizes; rbd_dev->header.snap_sizes = h.snap_sizes;
if (follow_seq)
rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
else
rbd_dev->header.snapc->seq = snap_seq; rbd_dev->header.snapc->seq = snap_seq;
ret = __rbd_init_snaps_header(rbd_dev); ret = __rbd_init_snaps_header(rbd_dev);
...@@ -1699,7 +1967,28 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev) ...@@ -1699,7 +1967,28 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
device_unregister(&rbd_dev->dev); device_unregister(&rbd_dev->dev);
} }
static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
{
int ret, rc;
do {
ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
rbd_dev->header.obj_version);
if (ret == -ERANGE) {
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
rc = __rbd_update_snaps(rbd_dev);
mutex_unlock(&ctl_mutex);
if (rc < 0)
return rc;
}
} while (ret == -ERANGE);
return ret;
}
static ssize_t rbd_add(struct bus_type *bus,
const char *buf,
size_t count)
{ {
struct ceph_osd_client *osdc; struct ceph_osd_client *osdc;
struct rbd_device *rbd_dev; struct rbd_device *rbd_dev;
...@@ -1797,6 +2086,10 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) ...@@ -1797,6 +2086,10 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
if (rc) if (rc)
goto err_out_bus; goto err_out_bus;
rc = rbd_init_watch_dev(rbd_dev);
if (rc)
goto err_out_bus;
return count; return count;
err_out_bus: err_out_bus:
...@@ -1849,6 +2142,12 @@ static void rbd_dev_release(struct device *dev) ...@@ -1849,6 +2142,12 @@ static void rbd_dev_release(struct device *dev)
struct rbd_device *rbd_dev = struct rbd_device *rbd_dev =
container_of(dev, struct rbd_device, dev); container_of(dev, struct rbd_device, dev);
if (rbd_dev->watch_request)
ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
rbd_dev->watch_request);
if (rbd_dev->watch_event)
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_put_client(rbd_dev); rbd_put_client(rbd_dev);
/* clean up and free blkdev */ /* clean up and free blkdev */
...@@ -1914,14 +2213,24 @@ static ssize_t rbd_snap_add(struct device *dev, ...@@ -1914,14 +2213,24 @@ static ssize_t rbd_snap_add(struct device *dev,
ret = rbd_header_add_snap(rbd_dev, ret = rbd_header_add_snap(rbd_dev,
name, GFP_KERNEL); name, GFP_KERNEL);
if (ret < 0) if (ret < 0)
goto done_unlock; goto err_unlock;
ret = __rbd_update_snaps(rbd_dev); ret = __rbd_update_snaps(rbd_dev);
if (ret < 0) if (ret < 0)
goto done_unlock; goto err_unlock;
/* shouldn't hold ctl_mutex when notifying.. notify might
trigger a watch callback that would need to get that mutex */
mutex_unlock(&ctl_mutex);
/* make a best effort, don't error if failed */
rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
ret = count; ret = count;
done_unlock: kfree(name);
return ret;
err_unlock:
mutex_unlock(&ctl_mutex); mutex_unlock(&ctl_mutex);
kfree(name); kfree(name);
return ret; return ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment