Commit 969e5aa3 authored by Alex Elder's avatar Alex Elder

Merge branch 'testing' of github.com:ceph/ceph-client into v3.8-rc5-testing

parents 949db153 1ec3911d
...@@ -52,8 +52,11 @@ ...@@ -52,8 +52,11 @@
#define SECTOR_SHIFT 9 #define SECTOR_SHIFT 9
#define SECTOR_SIZE (1ULL << SECTOR_SHIFT) #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
/* It might be useful to have this defined elsewhere too */ /* It might be useful to have these defined elsewhere */
#define U8_MAX ((u8) (~0U))
#define U16_MAX ((u16) (~0U))
#define U32_MAX ((u32) (~0U))
#define U64_MAX ((u64) (~0ULL)) #define U64_MAX ((u64) (~0ULL))
#define RBD_DRV_NAME "rbd" #define RBD_DRV_NAME "rbd"
...@@ -66,7 +69,6 @@ ...@@ -66,7 +69,6 @@
(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
#define RBD_MAX_OPT_LEN 1024
#define RBD_SNAP_HEAD_NAME "-" #define RBD_SNAP_HEAD_NAME "-"
...@@ -93,8 +95,6 @@ ...@@ -93,8 +95,6 @@
#define DEV_NAME_LEN 32 #define DEV_NAME_LEN 32
#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
#define RBD_READ_ONLY_DEFAULT false
/* /*
* block device image metadata (in-memory version) * block device image metadata (in-memory version)
*/ */
...@@ -119,16 +119,33 @@ struct rbd_image_header { ...@@ -119,16 +119,33 @@ struct rbd_image_header {
* An rbd image specification. * An rbd image specification.
* *
* The tuple (pool_id, image_id, snap_id) is sufficient to uniquely * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
* identify an image. * identify an image. Each rbd_dev structure includes a pointer to
* an rbd_spec structure that encapsulates this identity.
*
* Each of the id's in an rbd_spec has an associated name. For a
* user-mapped image, the names are supplied and the id's associated
* with them are looked up. For a layered image, a parent image is
* defined by the tuple, and the names are looked up.
*
* An rbd_dev structure contains a parent_spec pointer which is
* non-null if the image it represents is a child in a layered
* image. This pointer will refer to the rbd_spec structure used
* by the parent rbd_dev for its own identity (i.e., the structure
* is shared between the parent and child).
*
* Since these structures are populated once, during the discovery
* phase of image construction, they are effectively immutable so
* we make no effort to synchronize access to them.
*
* Note that code herein does not assume the image name is known (it
* could be a null pointer).
*/ */
struct rbd_spec { struct rbd_spec {
u64 pool_id; u64 pool_id;
char *pool_name; char *pool_name;
char *image_id; char *image_id;
size_t image_id_len;
char *image_name; char *image_name;
size_t image_name_len;
u64 snap_id; u64 snap_id;
char *snap_name; char *snap_name;
...@@ -136,10 +153,6 @@ struct rbd_spec { ...@@ -136,10 +153,6 @@ struct rbd_spec {
struct kref kref; struct kref kref;
}; };
struct rbd_options {
bool read_only;
};
/* /*
* an instance of the client. multiple devices may share an rbd client. * an instance of the client. multiple devices may share an rbd client.
*/ */
...@@ -154,7 +167,7 @@ struct rbd_client { ...@@ -154,7 +167,7 @@ struct rbd_client {
*/ */
struct rbd_req_status { struct rbd_req_status {
int done; int done;
int rc; s32 rc;
u64 bytes; u64 bytes;
}; };
...@@ -212,11 +225,13 @@ struct rbd_device { ...@@ -212,11 +225,13 @@ struct rbd_device {
spinlock_t lock; /* queue lock */ spinlock_t lock; /* queue lock */
struct rbd_image_header header; struct rbd_image_header header;
bool exists; atomic_t exists;
struct rbd_spec *spec; struct rbd_spec *spec;
char *header_name; char *header_name;
struct ceph_file_layout layout;
struct ceph_osd_event *watch_event; struct ceph_osd_event *watch_event;
struct ceph_osd_request *watch_request; struct ceph_osd_request *watch_request;
...@@ -277,6 +292,33 @@ static struct device rbd_root_dev = { ...@@ -277,6 +292,33 @@ static struct device rbd_root_dev = {
.release = rbd_root_dev_release, .release = rbd_root_dev_release,
}; };
static __printf(2, 3)
void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
{
struct va_format vaf;
va_list args;
va_start(args, fmt);
vaf.fmt = fmt;
vaf.va = &args;
if (!rbd_dev)
printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
else if (rbd_dev->disk)
printk(KERN_WARNING "%s: %s: %pV\n",
RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
else if (rbd_dev->spec && rbd_dev->spec->image_name)
printk(KERN_WARNING "%s: image %s: %pV\n",
RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
else if (rbd_dev->spec && rbd_dev->spec->image_id)
printk(KERN_WARNING "%s: id %s: %pV\n",
RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
else /* punt */
printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
RBD_DRV_NAME, rbd_dev, &vaf);
va_end(args);
}
#ifdef RBD_DEBUG #ifdef RBD_DEBUG
#define rbd_assert(expr) \ #define rbd_assert(expr) \
if (unlikely(!(expr))) { \ if (unlikely(!(expr))) { \
...@@ -426,6 +468,12 @@ static match_table_t rbd_opts_tokens = { ...@@ -426,6 +468,12 @@ static match_table_t rbd_opts_tokens = {
{-1, NULL} {-1, NULL}
}; };
struct rbd_options {
bool read_only;
};
#define RBD_READ_ONLY_DEFAULT false
static int parse_rbd_opts_token(char *c, void *private) static int parse_rbd_opts_token(char *c, void *private)
{ {
struct rbd_options *rbd_opts = private; struct rbd_options *rbd_opts = private;
...@@ -707,7 +755,7 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) ...@@ -707,7 +755,7 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
goto done; goto done;
rbd_dev->mapping.read_only = true; rbd_dev->mapping.read_only = true;
} }
rbd_dev->exists = true; atomic_set(&rbd_dev->exists, 1);
done: done:
return ret; return ret;
} }
...@@ -724,7 +772,7 @@ static void rbd_header_free(struct rbd_image_header *header) ...@@ -724,7 +772,7 @@ static void rbd_header_free(struct rbd_image_header *header)
header->snapc = NULL; header->snapc = NULL;
} }
static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
{ {
char *name; char *name;
u64 segment; u64 segment;
...@@ -772,6 +820,7 @@ static int rbd_get_num_segments(struct rbd_image_header *header, ...@@ -772,6 +820,7 @@ static int rbd_get_num_segments(struct rbd_image_header *header,
{ {
u64 start_seg; u64 start_seg;
u64 end_seg; u64 end_seg;
u64 result;
if (!len) if (!len)
return 0; return 0;
...@@ -781,7 +830,11 @@ static int rbd_get_num_segments(struct rbd_image_header *header, ...@@ -781,7 +830,11 @@ static int rbd_get_num_segments(struct rbd_image_header *header,
start_seg = ofs >> header->obj_order; start_seg = ofs >> header->obj_order;
end_seg = (ofs + len - 1) >> header->obj_order; end_seg = (ofs + len - 1) >> header->obj_order;
return end_seg - start_seg + 1; result = end_seg - start_seg + 1;
if (result > (u64) INT_MAX)
return -ERANGE;
return (int) result;
} }
/* /*
...@@ -949,8 +1002,10 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src, ...@@ -949,8 +1002,10 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src,
unsigned int bi_size; unsigned int bi_size;
struct bio *bio; struct bio *bio;
if (!bi) if (!bi) {
rbd_warn(NULL, "bio_chain exhausted with %u left", len);
goto out_err; /* EINVAL; ran out of bio's */ goto out_err; /* EINVAL; ran out of bio's */
}
bi_size = min_t(unsigned int, bi->bi_size - off, len); bi_size = min_t(unsigned int, bi->bi_size - off, len);
bio = bio_clone_range(bi, off, bi_size, gfpmask); bio = bio_clone_range(bi, off, bi_size, gfpmask);
if (!bio) if (!bio)
...@@ -976,44 +1031,84 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src, ...@@ -976,44 +1031,84 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src,
return NULL; return NULL;
} }
/* struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
* helpers for osd request op vectors.
*/
static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
int opcode, u32 payload_len)
{ {
struct ceph_osd_req_op *ops; struct ceph_osd_req_op *op;
va_list args;
size_t size;
ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO); op = kzalloc(sizeof (*op), GFP_NOIO);
if (!ops) if (!op)
return NULL; return NULL;
op->op = opcode;
va_start(args, opcode);
switch (opcode) {
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
/* rbd_osd_req_op_create(READ, offset, length) */
/* rbd_osd_req_op_create(WRITE, offset, length) */
op->extent.offset = va_arg(args, u64);
op->extent.length = va_arg(args, u64);
if (opcode == CEPH_OSD_OP_WRITE)
op->payload_len = op->extent.length;
break;
case CEPH_OSD_OP_CALL:
/* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
op->cls.class_name = va_arg(args, char *);
size = strlen(op->cls.class_name);
rbd_assert(size <= (size_t) U8_MAX);
op->cls.class_len = size;
op->payload_len = size;
op->cls.method_name = va_arg(args, char *);
size = strlen(op->cls.method_name);
rbd_assert(size <= (size_t) U8_MAX);
op->cls.method_len = size;
op->payload_len += size;
op->cls.argc = 0;
op->cls.indata = va_arg(args, void *);
size = va_arg(args, size_t);
rbd_assert(size <= (size_t) U32_MAX);
op->cls.indata_len = (u32) size;
op->payload_len += size;
break;
case CEPH_OSD_OP_NOTIFY_ACK:
case CEPH_OSD_OP_WATCH:
/* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
/* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
op->watch.cookie = va_arg(args, u64);
op->watch.ver = va_arg(args, u64);
op->watch.ver = cpu_to_le64(op->watch.ver);
if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
op->watch.flag = (u8) 1;
break;
default:
rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
kfree(op);
op = NULL;
break;
}
va_end(args);
ops[0].op = opcode; return op;
/*
* op extent offset and length will be set later on
* in calc_raw_layout()
*/
ops[0].payload_len = payload_len;
return ops;
} }
static void rbd_destroy_ops(struct ceph_osd_req_op *ops) static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
{ {
kfree(ops); kfree(op);
} }
static void rbd_coll_end_req_index(struct request *rq, static void rbd_coll_end_req_index(struct request *rq,
struct rbd_req_coll *coll, struct rbd_req_coll *coll,
int index, int index,
int ret, u64 len) s32 ret, u64 len)
{ {
struct request_queue *q; struct request_queue *q;
int min, max, i; int min, max, i;
dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n", dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
coll, index, ret, (unsigned long long) len); coll, index, (int)ret, (unsigned long long)len);
if (!rq) if (!rq)
return; return;
...@@ -1034,7 +1129,7 @@ static void rbd_coll_end_req_index(struct request *rq, ...@@ -1034,7 +1129,7 @@ static void rbd_coll_end_req_index(struct request *rq,
max++; max++;
for (i = min; i<max; i++) { for (i = min; i<max; i++) {
__blk_end_request(rq, coll->status[i].rc, __blk_end_request(rq, (int)coll->status[i].rc,
coll->status[i].bytes); coll->status[i].bytes);
coll->num_done++; coll->num_done++;
kref_put(&coll->kref, rbd_coll_release); kref_put(&coll->kref, rbd_coll_release);
...@@ -1042,10 +1137,12 @@ static void rbd_coll_end_req_index(struct request *rq, ...@@ -1042,10 +1137,12 @@ static void rbd_coll_end_req_index(struct request *rq,
spin_unlock_irq(q->queue_lock); spin_unlock_irq(q->queue_lock);
} }
static void rbd_coll_end_req(struct rbd_request *req, static void rbd_coll_end_req(struct rbd_request *rbd_req,
int ret, u64 len) s32 ret, u64 len)
{ {
rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len); rbd_coll_end_req_index(rbd_req->rq,
rbd_req->coll, rbd_req->coll_index,
ret, len);
} }
/* /*
...@@ -1060,117 +1157,102 @@ static int rbd_do_request(struct request *rq, ...@@ -1060,117 +1157,102 @@ static int rbd_do_request(struct request *rq,
struct page **pages, struct page **pages,
int num_pages, int num_pages,
int flags, int flags,
struct ceph_osd_req_op *ops, struct ceph_osd_req_op *op,
struct rbd_req_coll *coll, struct rbd_req_coll *coll,
int coll_index, int coll_index,
void (*rbd_cb)(struct ceph_osd_request *req, void (*rbd_cb)(struct ceph_osd_request *,
struct ceph_msg *msg), struct ceph_msg *),
struct ceph_osd_request **linger_req,
u64 *ver) u64 *ver)
{ {
struct ceph_osd_request *req;
struct ceph_file_layout *layout;
int ret;
u64 bno;
struct timespec mtime = CURRENT_TIME;
struct rbd_request *req_data;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_client *osdc; struct ceph_osd_client *osdc;
struct ceph_osd_request *osd_req;
req_data = kzalloc(sizeof(*req_data), GFP_NOIO); struct rbd_request *rbd_req = NULL;
if (!req_data) { struct timespec mtime = CURRENT_TIME;
if (coll) int ret;
rbd_coll_end_req_index(rq, coll, coll_index,
-ENOMEM, len);
return -ENOMEM;
}
if (coll) {
req_data->coll = coll;
req_data->coll_index = coll_index;
}
dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n", dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
object_name, (unsigned long long) ofs, object_name, (unsigned long long) ofs,
(unsigned long long) len, coll, coll_index); (unsigned long long) len, coll, coll_index);
osdc = &rbd_dev->rbd_client->client->osdc; osdc = &rbd_dev->rbd_client->client->osdc;
req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
false, GFP_NOIO, pages, bio); if (!osd_req)
if (!req) { return -ENOMEM;
ret = -ENOMEM;
goto done_pages;
}
req->r_callback = rbd_cb; osd_req->r_flags = flags;
osd_req->r_pages = pages;
if (bio) {
osd_req->r_bio = bio;
bio_get(osd_req->r_bio);
}
req_data->rq = rq; if (coll) {
req_data->bio = bio; ret = -ENOMEM;
req_data->pages = pages; rbd_req = kmalloc(sizeof(*rbd_req), GFP_NOIO);
req_data->len = len; if (!rbd_req)
goto done_osd_req;
req->r_priv = req_data; rbd_req->rq = rq;
rbd_req->bio = bio;
rbd_req->pages = pages;
rbd_req->len = len;
rbd_req->coll = coll;
rbd_req->coll_index = coll_index;
}
reqhead = req->r_request->front.iov_base; osd_req->r_callback = rbd_cb;
reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); osd_req->r_priv = rbd_req;
strncpy(req->r_oid, object_name, sizeof(req->r_oid)); strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
req->r_oid_len = strlen(req->r_oid); osd_req->r_oid_len = strlen(osd_req->r_oid);
layout = &req->r_file_layout; osd_req->r_file_layout = rbd_dev->layout; /* struct */
memset(layout, 0, sizeof(*layout)); osd_req->r_num_pages = calc_pages_for(ofs, len);
layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); osd_req->r_page_alignment = ofs & ~PAGE_MASK;
layout->fl_stripe_count = cpu_to_le32(1);
layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
req, ops);
rbd_assert(ret == 0);
ceph_osdc_build_request(req, ofs, &len, ceph_osdc_build_request(osd_req, ofs, len, 1, op,
ops, snapc, snapid, &mtime);
snapc,
&mtime,
req->r_oid, req->r_oid_len);
if (linger_req) { if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
ceph_osdc_set_request_linger(osdc, req); ceph_osdc_set_request_linger(osdc, osd_req);
*linger_req = req; rbd_dev->watch_request = osd_req;
} }
ret = ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_start_request(osdc, osd_req, false);
if (ret < 0) if (ret < 0)
goto done_err; goto done_err;
if (!rbd_cb) { if (!rbd_cb) {
ret = ceph_osdc_wait_request(osdc, req); u64 version;
ret = ceph_osdc_wait_request(osdc, osd_req);
version = le64_to_cpu(osd_req->r_reassert_version.version);
if (ver) if (ver)
*ver = le64_to_cpu(req->r_reassert_version.version); *ver = version;
dout("reassert_ver=%llu\n", dout("reassert_ver=%llu\n", (unsigned long long) version);
(unsigned long long) ceph_osdc_put_request(osd_req);
le64_to_cpu(req->r_reassert_version.version));
ceph_osdc_put_request(req);
} }
return ret; return ret;
done_err: done_err:
bio_chain_put(req_data->bio); if (bio)
ceph_osdc_put_request(req); bio_chain_put(osd_req->r_bio);
done_pages: kfree(rbd_req);
rbd_coll_end_req(req_data, ret, len); done_osd_req:
kfree(req_data); ceph_osdc_put_request(osd_req);
return ret; return ret;
} }
/* /*
* Ceph osd op callback * Ceph osd op callback
*/ */
static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
{ {
struct rbd_request *req_data = req->r_priv; struct rbd_request *rbd_req = osd_req->r_priv;
struct ceph_osd_reply_head *replyhead; struct ceph_osd_reply_head *replyhead;
struct ceph_osd_op *op; struct ceph_osd_op *op;
__s32 rc; s32 rc;
u64 bytes; u64 bytes;
int read_op; int read_op;
...@@ -1178,68 +1260,66 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) ...@@ -1178,68 +1260,66 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
replyhead = msg->front.iov_base; replyhead = msg->front.iov_base;
WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
op = (void *)(replyhead + 1); op = (void *)(replyhead + 1);
rc = le32_to_cpu(replyhead->result); rc = (s32)le32_to_cpu(replyhead->result);
bytes = le64_to_cpu(op->extent.length); bytes = le64_to_cpu(op->extent.length);
read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ); read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n", dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
(unsigned long long) bytes, read_op, (int) rc); (unsigned long long) bytes, read_op, (int) rc);
if (rc == -ENOENT && read_op) { if (rc == (s32)-ENOENT && read_op) {
zero_bio_chain(req_data->bio, 0); zero_bio_chain(rbd_req->bio, 0);
rc = 0; rc = 0;
} else if (rc == 0 && read_op && bytes < req_data->len) { } else if (rc == 0 && read_op && bytes < rbd_req->len) {
zero_bio_chain(req_data->bio, bytes); zero_bio_chain(rbd_req->bio, bytes);
bytes = req_data->len; bytes = rbd_req->len;
} }
rbd_coll_end_req(req_data, rc, bytes); rbd_coll_end_req(rbd_req, rc, bytes);
if (req_data->bio) if (rbd_req->bio)
bio_chain_put(req_data->bio); bio_chain_put(rbd_req->bio);
ceph_osdc_put_request(req); ceph_osdc_put_request(osd_req);
kfree(req_data); kfree(rbd_req);
} }
static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
struct ceph_msg *msg)
{ {
ceph_osdc_put_request(req); ceph_osdc_put_request(osd_req);
} }
/* /*
* Do a synchronous ceph osd operation * Do a synchronous ceph osd operation
*/ */
static int rbd_req_sync_op(struct rbd_device *rbd_dev, static int rbd_req_sync_op(struct rbd_device *rbd_dev,
struct ceph_snap_context *snapc,
u64 snapid,
int flags, int flags,
struct ceph_osd_req_op *ops, struct ceph_osd_req_op *op,
const char *object_name, const char *object_name,
u64 ofs, u64 inbound_size, u64 ofs, u64 inbound_size,
char *inbound, char *inbound,
struct ceph_osd_request **linger_req,
u64 *ver) u64 *ver)
{ {
int ret; int ret;
struct page **pages; struct page **pages;
int num_pages; int num_pages;
rbd_assert(ops != NULL); rbd_assert(op != NULL);
num_pages = calc_pages_for(ofs, inbound_size); num_pages = calc_pages_for(ofs, inbound_size);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages)) if (IS_ERR(pages))
return PTR_ERR(pages); return PTR_ERR(pages);
ret = rbd_do_request(NULL, rbd_dev, snapc, snapid, ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
object_name, ofs, inbound_size, NULL, object_name, ofs, inbound_size, NULL,
pages, num_pages, pages, num_pages,
flags, flags,
ops, op,
NULL, 0, NULL, 0,
NULL, NULL,
linger_req, ver); ver);
if (ret < 0) if (ret < 0)
goto done; goto done;
...@@ -1262,12 +1342,11 @@ static int rbd_do_op(struct request *rq, ...@@ -1262,12 +1342,11 @@ static int rbd_do_op(struct request *rq,
struct rbd_req_coll *coll, struct rbd_req_coll *coll,
int coll_index) int coll_index)
{ {
char *seg_name; const char *seg_name;
u64 seg_ofs; u64 seg_ofs;
u64 seg_len; u64 seg_len;
int ret; int ret;
struct ceph_osd_req_op *ops; struct ceph_osd_req_op *op;
u32 payload_len;
int opcode; int opcode;
int flags; int flags;
u64 snapid; u64 snapid;
...@@ -1282,18 +1361,16 @@ static int rbd_do_op(struct request *rq, ...@@ -1282,18 +1361,16 @@ static int rbd_do_op(struct request *rq,
opcode = CEPH_OSD_OP_WRITE; opcode = CEPH_OSD_OP_WRITE;
flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK; flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
snapid = CEPH_NOSNAP; snapid = CEPH_NOSNAP;
payload_len = seg_len;
} else { } else {
opcode = CEPH_OSD_OP_READ; opcode = CEPH_OSD_OP_READ;
flags = CEPH_OSD_FLAG_READ; flags = CEPH_OSD_FLAG_READ;
snapc = NULL; rbd_assert(!snapc);
snapid = rbd_dev->spec->snap_id; snapid = rbd_dev->spec->snap_id;
payload_len = 0;
} }
ret = -ENOMEM; ret = -ENOMEM;
ops = rbd_create_rw_ops(1, opcode, payload_len); op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
if (!ops) if (!op)
goto done; goto done;
/* we've taken care of segment sizes earlier when we /* we've taken care of segment sizes earlier when we
...@@ -1306,11 +1383,13 @@ static int rbd_do_op(struct request *rq, ...@@ -1306,11 +1383,13 @@ static int rbd_do_op(struct request *rq,
bio, bio,
NULL, 0, NULL, 0,
flags, flags,
ops, op,
coll, coll_index, coll, coll_index,
rbd_req_cb, 0, NULL); rbd_req_cb, NULL);
if (ret < 0)
rbd_destroy_ops(ops); rbd_coll_end_req_index(rq, coll, coll_index,
(s32)ret, seg_len);
rbd_osd_req_op_destroy(op);
done: done:
kfree(seg_name); kfree(seg_name);
return ret; return ret;
...@@ -1320,24 +1399,21 @@ static int rbd_do_op(struct request *rq, ...@@ -1320,24 +1399,21 @@ static int rbd_do_op(struct request *rq,
* Request sync osd read * Request sync osd read
*/ */
static int rbd_req_sync_read(struct rbd_device *rbd_dev, static int rbd_req_sync_read(struct rbd_device *rbd_dev,
u64 snapid,
const char *object_name, const char *object_name,
u64 ofs, u64 len, u64 ofs, u64 len,
char *buf, char *buf,
u64 *ver) u64 *ver)
{ {
struct ceph_osd_req_op *ops; struct ceph_osd_req_op *op;
int ret; int ret;
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0); op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, ofs, len);
if (!ops) if (!op)
return -ENOMEM; return -ENOMEM;
ret = rbd_req_sync_op(rbd_dev, NULL, ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ,
snapid, op, object_name, ofs, len, buf, ver);
CEPH_OSD_FLAG_READ, rbd_osd_req_op_destroy(op);
ops, object_name, ofs, len, buf, NULL, ver);
rbd_destroy_ops(ops);
return ret; return ret;
} }
...@@ -1349,26 +1425,23 @@ static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev, ...@@ -1349,26 +1425,23 @@ static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
u64 ver, u64 ver,
u64 notify_id) u64 notify_id)
{ {
struct ceph_osd_req_op *ops; struct ceph_osd_req_op *op;
int ret; int ret;
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0); op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
if (!ops) if (!op)
return -ENOMEM; return -ENOMEM;
ops[0].watch.ver = cpu_to_le64(ver);
ops[0].watch.cookie = notify_id;
ops[0].watch.flag = 0;
ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP, ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
rbd_dev->header_name, 0, 0, NULL, rbd_dev->header_name, 0, 0, NULL,
NULL, 0, NULL, 0,
CEPH_OSD_FLAG_READ, CEPH_OSD_FLAG_READ,
ops, op,
NULL, 0, NULL, 0,
rbd_simple_req_cb, 0, NULL); rbd_simple_req_cb, NULL);
rbd_osd_req_op_destroy(op);
rbd_destroy_ops(ops);
return ret; return ret;
} }
...@@ -1386,83 +1459,51 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) ...@@ -1386,83 +1459,51 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
(unsigned int) opcode); (unsigned int) opcode);
rc = rbd_dev_refresh(rbd_dev, &hver); rc = rbd_dev_refresh(rbd_dev, &hver);
if (rc) if (rc)
pr_warning(RBD_DRV_NAME "%d got notification but failed to " rbd_warn(rbd_dev, "got notification but failed to "
" update snaps: %d\n", rbd_dev->major, rc); " update snaps: %d\n", rc);
rbd_req_sync_notify_ack(rbd_dev, hver, notify_id); rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
} }
/* /*
* Request sync osd watch * Request sync osd watch/unwatch. The value of "start" determines
* whether a watch request is being initiated or torn down.
*/ */
static int rbd_req_sync_watch(struct rbd_device *rbd_dev) static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
{ {
struct ceph_osd_req_op *ops; struct ceph_osd_req_op *op;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret = 0;
int ret;
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
if (!ops)
return -ENOMEM;
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
(void *)rbd_dev, &rbd_dev->watch_event);
if (ret < 0)
goto fail;
ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version); rbd_assert(start ^ !!rbd_dev->watch_event);
ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie); rbd_assert(start ^ !!rbd_dev->watch_request);
ops[0].watch.flag = 1;
ret = rbd_req_sync_op(rbd_dev, NULL, if (start) {
CEPH_NOSNAP, struct ceph_osd_client *osdc;
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops,
rbd_dev->header_name,
0, 0, NULL,
&rbd_dev->watch_request, NULL);
osdc = &rbd_dev->rbd_client->client->osdc;
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
&rbd_dev->watch_event);
if (ret < 0) if (ret < 0)
goto fail_event;
rbd_destroy_ops(ops);
return 0;
fail_event:
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
fail:
rbd_destroy_ops(ops);
return ret; return ret;
} }
/*
* Request sync osd unwatch
*/
static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
{
struct ceph_osd_req_op *ops;
int ret;
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
if (!ops)
return -ENOMEM;
ops[0].watch.ver = 0;
ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
ops[0].watch.flag = 0;
ret = rbd_req_sync_op(rbd_dev, NULL, op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
CEPH_NOSNAP, rbd_dev->watch_event->cookie,
rbd_dev->header.obj_version, start);
if (op)
ret = rbd_req_sync_op(rbd_dev,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
ops, op, rbd_dev->header_name,
rbd_dev->header_name, 0, 0, NULL, NULL);
0, 0, NULL, NULL, NULL);
/* Cancel the event if we're tearing down, or on error */
rbd_destroy_ops(ops); if (!start || !op || ret < 0) {
ceph_osdc_cancel_event(rbd_dev->watch_event); ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL; rbd_dev->watch_event = NULL;
}
rbd_osd_req_op_destroy(op);
return ret; return ret;
} }
...@@ -1477,13 +1518,9 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev, ...@@ -1477,13 +1518,9 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
size_t outbound_size, size_t outbound_size,
char *inbound, char *inbound,
size_t inbound_size, size_t inbound_size,
int flags,
u64 *ver) u64 *ver)
{ {
struct ceph_osd_req_op *ops; struct ceph_osd_req_op *op;
int class_name_len = strlen(class_name);
int method_name_len = strlen(method_name);
int payload_size;
int ret; int ret;
/* /*
...@@ -1494,26 +1531,16 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev, ...@@ -1494,26 +1531,16 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
* the perspective of the server side) in the OSD request * the perspective of the server side) in the OSD request
* operation. * operation.
*/ */
payload_size = class_name_len + method_name_len + outbound_size; op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size); method_name, outbound, outbound_size);
if (!ops) if (!op)
return -ENOMEM; return -ENOMEM;
ops[0].cls.class_name = class_name; ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
ops[0].cls.class_len = (__u8) class_name_len;
ops[0].cls.method_name = method_name;
ops[0].cls.method_len = (__u8) method_name_len;
ops[0].cls.argc = 0;
ops[0].cls.indata = outbound;
ops[0].cls.indata_len = outbound_size;
ret = rbd_req_sync_op(rbd_dev, NULL,
CEPH_NOSNAP,
flags, ops,
object_name, 0, inbound_size, inbound, object_name, 0, inbound_size, inbound,
NULL, ver); ver);
rbd_destroy_ops(ops); rbd_osd_req_op_destroy(op);
dout("cls_exec returned %d\n", ret); dout("cls_exec returned %d\n", ret);
return ret; return ret;
...@@ -1533,113 +1560,123 @@ static struct rbd_req_coll *rbd_alloc_coll(int num_reqs) ...@@ -1533,113 +1560,123 @@ static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
return coll; return coll;
} }
/* static int rbd_dev_do_request(struct request *rq,
* block device queue callback struct rbd_device *rbd_dev,
*/ struct ceph_snap_context *snapc,
static void rbd_rq_fn(struct request_queue *q) u64 ofs, unsigned int size,
struct bio *bio_chain)
{ {
struct rbd_device *rbd_dev = q->queuedata; int num_segs;
struct request *rq;
while ((rq = blk_fetch_request(q))) {
struct bio *bio;
bool do_write;
unsigned int size;
u64 ofs;
int num_segs, cur_seg = 0;
struct rbd_req_coll *coll; struct rbd_req_coll *coll;
struct ceph_snap_context *snapc;
unsigned int bio_offset; unsigned int bio_offset;
int cur_seg = 0;
dout("fetched request\n");
/* filter out block requests we don't understand */
if ((rq->cmd_type != REQ_TYPE_FS)) {
__blk_end_request_all(rq, 0);
continue;
}
/* deduce our operation (read, write) */
do_write = (rq_data_dir(rq) == WRITE);
if (do_write && rbd_dev->mapping.read_only) {
__blk_end_request_all(rq, -EROFS);
continue;
}
spin_unlock_irq(q->queue_lock);
down_read(&rbd_dev->header_rwsem);
if (!rbd_dev->exists) {
rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
up_read(&rbd_dev->header_rwsem);
dout("request for non-existent snapshot");
spin_lock_irq(q->queue_lock);
__blk_end_request_all(rq, -ENXIO);
continue;
}
snapc = ceph_get_snap_context(rbd_dev->header.snapc);
up_read(&rbd_dev->header_rwsem);
size = blk_rq_bytes(rq);
ofs = blk_rq_pos(rq) * SECTOR_SIZE;
bio = rq->bio;
dout("%s 0x%x bytes at 0x%llx\n", dout("%s 0x%x bytes at 0x%llx\n",
do_write ? "write" : "read", rq_data_dir(rq) == WRITE ? "write" : "read",
size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
if (num_segs <= 0) { if (num_segs <= 0)
spin_lock_irq(q->queue_lock); return num_segs;
__blk_end_request_all(rq, num_segs);
ceph_put_snap_context(snapc);
continue;
}
coll = rbd_alloc_coll(num_segs); coll = rbd_alloc_coll(num_segs);
if (!coll) { if (!coll)
spin_lock_irq(q->queue_lock); return -ENOMEM;
__blk_end_request_all(rq, -ENOMEM);
ceph_put_snap_context(snapc);
continue;
}
bio_offset = 0; bio_offset = 0;
do { do {
u64 limit = rbd_segment_length(rbd_dev, ofs, size); u64 limit = rbd_segment_length(rbd_dev, ofs, size);
unsigned int chain_size; unsigned int clone_size;
struct bio *bio_chain; struct bio *bio_clone;
BUG_ON(limit > (u64) UINT_MAX); BUG_ON(limit > (u64)UINT_MAX);
chain_size = (unsigned int) limit; clone_size = (unsigned int)limit;
dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
kref_get(&coll->kref); kref_get(&coll->kref);
/* Pass a cloned bio chain via an osd request */ /* Pass a cloned bio chain via an osd request */
bio_chain = bio_chain_clone_range(&bio, bio_clone = bio_chain_clone_range(&bio_chain,
&bio_offset, chain_size, &bio_offset, clone_size,
GFP_ATOMIC); GFP_ATOMIC);
if (bio_chain) if (bio_clone)
(void) rbd_do_op(rq, rbd_dev, snapc, (void)rbd_do_op(rq, rbd_dev, snapc,
ofs, chain_size, ofs, clone_size,
bio_chain, coll, cur_seg); bio_clone, coll, cur_seg);
else else
rbd_coll_end_req_index(rq, coll, cur_seg, rbd_coll_end_req_index(rq, coll, cur_seg,
-ENOMEM, chain_size); (s32)-ENOMEM,
size -= chain_size; clone_size);
ofs += chain_size; size -= clone_size;
ofs += clone_size;
cur_seg++; cur_seg++;
} while (size > 0); } while (size > 0);
kref_put(&coll->kref, rbd_coll_release); kref_put(&coll->kref, rbd_coll_release);
spin_lock_irq(q->queue_lock); return 0;
}
/*
* block device queue callback
*/
static void rbd_rq_fn(struct request_queue *q)
{
struct rbd_device *rbd_dev = q->queuedata;
bool read_only = rbd_dev->mapping.read_only;
struct request *rq;
while ((rq = blk_fetch_request(q))) {
struct ceph_snap_context *snapc = NULL;
unsigned int size = 0;
int result;
dout("fetched request\n");
/* Filter out block requests we don't understand */
if ((rq->cmd_type != REQ_TYPE_FS)) {
__blk_end_request_all(rq, 0);
continue;
}
spin_unlock_irq(q->queue_lock);
/* Write requests need a reference to the snapshot context */
if (rq_data_dir(rq) == WRITE) {
result = -EROFS;
if (read_only) /* Can't write to a read-only device */
goto out_end_request;
/*
* Note that each osd request will take its
* own reference to the snapshot context
* supplied. The reference we take here
* just guarantees the one we provide stays
* valid.
*/
down_read(&rbd_dev->header_rwsem);
snapc = ceph_get_snap_context(rbd_dev->header.snapc);
up_read(&rbd_dev->header_rwsem);
rbd_assert(snapc != NULL);
} else if (!atomic_read(&rbd_dev->exists)) {
rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
dout("request for non-existent snapshot");
result = -ENXIO;
goto out_end_request;
}
size = blk_rq_bytes(rq);
result = rbd_dev_do_request(rq, rbd_dev, snapc,
blk_rq_pos(rq) * SECTOR_SIZE,
size, rq->bio);
out_end_request:
if (snapc)
ceph_put_snap_context(snapc); ceph_put_snap_context(snapc);
spin_lock_irq(q->queue_lock);
if (!size || result < 0)
__blk_end_request_all(rq, result);
} }
} }
...@@ -1741,8 +1778,7 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) ...@@ -1741,8 +1778,7 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
if (!ondisk) if (!ondisk)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP, ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
rbd_dev->header_name,
0, size, 0, size,
(char *) ondisk, version); (char *) ondisk, version);
...@@ -1750,15 +1786,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) ...@@ -1750,15 +1786,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
goto out_err; goto out_err;
if (WARN_ON((size_t) ret < size)) { if (WARN_ON((size_t) ret < size)) {
ret = -ENXIO; ret = -ENXIO;
pr_warning("short header read for image %s" rbd_warn(rbd_dev, "short header read (want %zd got %d)",
" (want %zd got %d)\n", size, ret);
rbd_dev->spec->image_name, size, ret);
goto out_err; goto out_err;
} }
if (!rbd_dev_ondisk_valid(ondisk)) { if (!rbd_dev_ondisk_valid(ondisk)) {
ret = -ENXIO; ret = -ENXIO;
pr_warning("invalid header for image %s\n", rbd_warn(rbd_dev, "invalid header");
rbd_dev->spec->image_name);
goto out_err; goto out_err;
} }
...@@ -2243,6 +2277,7 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, ...@@ -2243,6 +2277,7 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
return NULL; return NULL;
spin_lock_init(&rbd_dev->lock); spin_lock_init(&rbd_dev->lock);
atomic_set(&rbd_dev->exists, 0);
INIT_LIST_HEAD(&rbd_dev->node); INIT_LIST_HEAD(&rbd_dev->node);
INIT_LIST_HEAD(&rbd_dev->snaps); INIT_LIST_HEAD(&rbd_dev->snaps);
init_rwsem(&rbd_dev->header_rwsem); init_rwsem(&rbd_dev->header_rwsem);
...@@ -2250,6 +2285,13 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, ...@@ -2250,6 +2285,13 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
rbd_dev->spec = spec; rbd_dev->spec = spec;
rbd_dev->rbd_client = rbdc; rbd_dev->rbd_client = rbdc;
/* Initialize the layout used for all rbd requests */
rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
return rbd_dev; return rbd_dev;
} }
...@@ -2363,8 +2405,7 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, ...@@ -2363,8 +2405,7 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
"rbd", "get_size", "rbd", "get_size",
(char *) &snapid, sizeof (snapid), (char *) &snapid, sizeof (snapid),
(char *) &size_buf, sizeof (size_buf), (char *) &size_buf, sizeof (size_buf), NULL);
CEPH_OSD_FLAG_READ, NULL);
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0) if (ret < 0)
return ret; return ret;
...@@ -2399,8 +2440,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) ...@@ -2399,8 +2440,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
"rbd", "get_object_prefix", "rbd", "get_object_prefix",
NULL, 0, NULL, 0,
reply_buf, RBD_OBJ_PREFIX_LEN_MAX, reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
CEPH_OSD_FLAG_READ, NULL);
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0) if (ret < 0)
goto out; goto out;
...@@ -2439,7 +2479,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, ...@@ -2439,7 +2479,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
"rbd", "get_features", "rbd", "get_features",
(char *) &snapid, sizeof (snapid), (char *) &snapid, sizeof (snapid),
(char *) &features_buf, sizeof (features_buf), (char *) &features_buf, sizeof (features_buf),
CEPH_OSD_FLAG_READ, NULL); NULL);
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0) if (ret < 0)
return ret; return ret;
...@@ -2474,7 +2514,6 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) ...@@ -2474,7 +2514,6 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
void *end; void *end;
char *image_id; char *image_id;
u64 overlap; u64 overlap;
size_t len = 0;
int ret; int ret;
parent_spec = rbd_spec_alloc(); parent_spec = rbd_spec_alloc();
...@@ -2495,8 +2534,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) ...@@ -2495,8 +2534,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
"rbd", "get_parent", "rbd", "get_parent",
(char *) &snapid, sizeof (snapid), (char *) &snapid, sizeof (snapid),
(char *) reply_buf, size, (char *) reply_buf, size, NULL);
CEPH_OSD_FLAG_READ, NULL);
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0) if (ret < 0)
goto out_err; goto out_err;
...@@ -2508,13 +2546,18 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) ...@@ -2508,13 +2546,18 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
if (parent_spec->pool_id == CEPH_NOPOOL) if (parent_spec->pool_id == CEPH_NOPOOL)
goto out; /* No parent? No problem. */ goto out; /* No parent? No problem. */
image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); /* The ceph file layout needs to fit pool id in 32 bits */
ret = -EIO;
if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
goto out;
image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
if (IS_ERR(image_id)) { if (IS_ERR(image_id)) {
ret = PTR_ERR(image_id); ret = PTR_ERR(image_id);
goto out_err; goto out_err;
} }
parent_spec->image_id = image_id; parent_spec->image_id = image_id;
parent_spec->image_id_len = len;
ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
ceph_decode_64_safe(&p, end, overlap, out_err); ceph_decode_64_safe(&p, end, overlap, out_err);
...@@ -2544,15 +2587,15 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev) ...@@ -2544,15 +2587,15 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
rbd_assert(!rbd_dev->spec->image_name); rbd_assert(!rbd_dev->spec->image_name);
image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len; len = strlen(rbd_dev->spec->image_id);
image_id_size = sizeof (__le32) + len;
image_id = kmalloc(image_id_size, GFP_KERNEL); image_id = kmalloc(image_id_size, GFP_KERNEL);
if (!image_id) if (!image_id)
return NULL; return NULL;
p = image_id; p = image_id;
end = (char *) image_id + image_id_size; end = (char *) image_id + image_id_size;
ceph_encode_string(&p, end, rbd_dev->spec->image_id, ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
(u32) rbd_dev->spec->image_id_len);
size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
reply_buf = kmalloc(size, GFP_KERNEL); reply_buf = kmalloc(size, GFP_KERNEL);
...@@ -2562,8 +2605,7 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev) ...@@ -2562,8 +2605,7 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY, ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
"rbd", "dir_get_name", "rbd", "dir_get_name",
image_id, image_id_size, image_id, image_id_size,
(char *) reply_buf, size, (char *) reply_buf, size, NULL);
CEPH_OSD_FLAG_READ, NULL);
if (ret < 0) if (ret < 0)
goto out; goto out;
p = reply_buf; p = reply_buf;
...@@ -2602,8 +2644,11 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) ...@@ -2602,8 +2644,11 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
osdc = &rbd_dev->rbd_client->client->osdc; osdc = &rbd_dev->rbd_client->client->osdc;
name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
if (!name) if (!name) {
return -EIO; /* pool id too large (>= 2^31) */ rbd_warn(rbd_dev, "there is no pool with id %llu",
rbd_dev->spec->pool_id); /* Really a BUG() */
return -EIO;
}
rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
if (!rbd_dev->spec->pool_name) if (!rbd_dev->spec->pool_name)
...@@ -2612,19 +2657,17 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) ...@@ -2612,19 +2657,17 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
/* Fetch the image name; tolerate failure here */ /* Fetch the image name; tolerate failure here */
name = rbd_dev_image_name(rbd_dev); name = rbd_dev_image_name(rbd_dev);
if (name) { if (name)
rbd_dev->spec->image_name_len = strlen(name);
rbd_dev->spec->image_name = (char *) name; rbd_dev->spec->image_name = (char *) name;
} else { else
pr_warning(RBD_DRV_NAME "%d " rbd_warn(rbd_dev, "unable to get image name");
"unable to get image name for image id %s\n",
rbd_dev->major, rbd_dev->spec->image_id);
}
/* Look up the snapshot name. */ /* Look up the snapshot name. */
name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
if (!name) { if (!name) {
rbd_warn(rbd_dev, "no snapshot with id %llu",
rbd_dev->spec->snap_id); /* Really a BUG() */
ret = -EIO; ret = -EIO;
goto out_err; goto out_err;
} }
...@@ -2668,8 +2711,7 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) ...@@ -2668,8 +2711,7 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
"rbd", "get_snapcontext", "rbd", "get_snapcontext",
NULL, 0, NULL, 0,
reply_buf, size, reply_buf, size, ver);
CEPH_OSD_FLAG_READ, ver);
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0) if (ret < 0)
goto out; goto out;
...@@ -2738,8 +2780,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) ...@@ -2738,8 +2780,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name, ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
"rbd", "get_snapshot_name", "rbd", "get_snapshot_name",
(char *) &snap_id, sizeof (snap_id), (char *) &snap_id, sizeof (snap_id),
reply_buf, size, reply_buf, size, NULL);
CEPH_OSD_FLAG_READ, NULL);
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0) if (ret < 0)
goto out; goto out;
...@@ -2766,7 +2807,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) ...@@ -2766,7 +2807,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
u64 *snap_size, u64 *snap_features) u64 *snap_size, u64 *snap_features)
{ {
__le64 snap_id; u64 snap_id;
u8 order; u8 order;
int ret; int ret;
...@@ -2868,7 +2909,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) ...@@ -2868,7 +2909,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
/* Existing snapshot not in the new snap context */ /* Existing snapshot not in the new snap context */
if (rbd_dev->spec->snap_id == snap->id) if (rbd_dev->spec->snap_id == snap->id)
rbd_dev->exists = false; atomic_set(&rbd_dev->exists, 0);
rbd_remove_snap_dev(snap); rbd_remove_snap_dev(snap);
dout("%ssnap id %llu has been removed\n", dout("%ssnap id %llu has been removed\n",
rbd_dev->spec->snap_id == snap->id ? rbd_dev->spec->snap_id == snap->id ?
...@@ -2983,22 +3024,6 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev) ...@@ -2983,22 +3024,6 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
device_unregister(&rbd_dev->dev); device_unregister(&rbd_dev->dev);
} }
static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
{
int ret, rc;
do {
ret = rbd_req_sync_watch(rbd_dev);
if (ret == -ERANGE) {
rc = rbd_dev_refresh(rbd_dev, NULL);
if (rc < 0)
return rc;
}
} while (ret == -ERANGE);
return ret;
}
static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
/* /*
...@@ -3138,11 +3163,9 @@ static inline char *dup_token(const char **buf, size_t *lenp) ...@@ -3138,11 +3163,9 @@ static inline char *dup_token(const char **buf, size_t *lenp)
size_t len; size_t len;
len = next_token(buf); len = next_token(buf);
dup = kmalloc(len + 1, GFP_KERNEL); dup = kmemdup(*buf, len + 1, GFP_KERNEL);
if (!dup) if (!dup)
return NULL; return NULL;
memcpy(dup, *buf, len);
*(dup + len) = '\0'; *(dup + len) = '\0';
*buf += len; *buf += len;
...@@ -3210,8 +3233,10 @@ static int rbd_add_parse_args(const char *buf, ...@@ -3210,8 +3233,10 @@ static int rbd_add_parse_args(const char *buf,
/* The first four tokens are required */ /* The first four tokens are required */
len = next_token(&buf); len = next_token(&buf);
if (!len) if (!len) {
return -EINVAL; /* Missing monitor address(es) */ rbd_warn(NULL, "no monitor address(es) provided");
return -EINVAL;
}
mon_addrs = buf; mon_addrs = buf;
mon_addrs_size = len + 1; mon_addrs_size = len + 1;
buf += len; buf += len;
...@@ -3220,8 +3245,10 @@ static int rbd_add_parse_args(const char *buf, ...@@ -3220,8 +3245,10 @@ static int rbd_add_parse_args(const char *buf,
options = dup_token(&buf, NULL); options = dup_token(&buf, NULL);
if (!options) if (!options)
return -ENOMEM; return -ENOMEM;
if (!*options) if (!*options) {
goto out_err; /* Missing options */ rbd_warn(NULL, "no options provided");
goto out_err;
}
spec = rbd_spec_alloc(); spec = rbd_spec_alloc();
if (!spec) if (!spec)
...@@ -3230,14 +3257,18 @@ static int rbd_add_parse_args(const char *buf, ...@@ -3230,14 +3257,18 @@ static int rbd_add_parse_args(const char *buf,
spec->pool_name = dup_token(&buf, NULL); spec->pool_name = dup_token(&buf, NULL);
if (!spec->pool_name) if (!spec->pool_name)
goto out_mem; goto out_mem;
if (!*spec->pool_name) if (!*spec->pool_name) {
goto out_err; /* Missing pool name */ rbd_warn(NULL, "no pool name provided");
goto out_err;
}
spec->image_name = dup_token(&buf, &spec->image_name_len); spec->image_name = dup_token(&buf, NULL);
if (!spec->image_name) if (!spec->image_name)
goto out_mem; goto out_mem;
if (!*spec->image_name) if (!*spec->image_name) {
goto out_err; /* Missing image name */ rbd_warn(NULL, "no image name provided");
goto out_err;
}
/* /*
* Snapshot name is optional; default is to use "-" * Snapshot name is optional; default is to use "-"
...@@ -3251,10 +3282,9 @@ static int rbd_add_parse_args(const char *buf, ...@@ -3251,10 +3282,9 @@ static int rbd_add_parse_args(const char *buf,
ret = -ENAMETOOLONG; ret = -ENAMETOOLONG;
goto out_err; goto out_err;
} }
spec->snap_name = kmalloc(len + 1, GFP_KERNEL); spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
if (!spec->snap_name) if (!spec->snap_name)
goto out_mem; goto out_mem;
memcpy(spec->snap_name, buf, len);
*(spec->snap_name + len) = '\0'; *(spec->snap_name + len) = '\0';
/* Initialize all rbd options to the defaults */ /* Initialize all rbd options to the defaults */
...@@ -3323,7 +3353,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) ...@@ -3323,7 +3353,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
* First, see if the format 2 image id file exists, and if * First, see if the format 2 image id file exists, and if
* so, get the image's persistent id from it. * so, get the image's persistent id from it.
*/ */
size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len; size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
object_name = kmalloc(size, GFP_NOIO); object_name = kmalloc(size, GFP_NOIO);
if (!object_name) if (!object_name)
return -ENOMEM; return -ENOMEM;
...@@ -3342,8 +3372,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) ...@@ -3342,8 +3372,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
ret = rbd_req_sync_exec(rbd_dev, object_name, ret = rbd_req_sync_exec(rbd_dev, object_name,
"rbd", "get_id", "rbd", "get_id",
NULL, 0, NULL, 0,
response, RBD_IMAGE_ID_LEN_MAX, response, RBD_IMAGE_ID_LEN_MAX, NULL);
CEPH_OSD_FLAG_READ, NULL);
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret); dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0) if (ret < 0)
goto out; goto out;
...@@ -3352,8 +3381,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) ...@@ -3352,8 +3381,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
p = response; p = response;
rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
p + RBD_IMAGE_ID_LEN_MAX, p + RBD_IMAGE_ID_LEN_MAX,
&rbd_dev->spec->image_id_len, NULL, GFP_NOIO);
GFP_NOIO);
if (IS_ERR(rbd_dev->spec->image_id)) { if (IS_ERR(rbd_dev->spec->image_id)) {
ret = PTR_ERR(rbd_dev->spec->image_id); ret = PTR_ERR(rbd_dev->spec->image_id);
rbd_dev->spec->image_id = NULL; rbd_dev->spec->image_id = NULL;
...@@ -3377,11 +3405,10 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) ...@@ -3377,11 +3405,10 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
if (!rbd_dev->spec->image_id) if (!rbd_dev->spec->image_id)
return -ENOMEM; return -ENOMEM;
rbd_dev->spec->image_id_len = 0;
/* Record the header object name for this rbd image. */ /* Record the header object name for this rbd image. */
size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX); size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
rbd_dev->header_name = kmalloc(size, GFP_KERNEL); rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
if (!rbd_dev->header_name) { if (!rbd_dev->header_name) {
ret = -ENOMEM; ret = -ENOMEM;
...@@ -3427,7 +3454,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) ...@@ -3427,7 +3454,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
* Image id was filled in by the caller. Record the header * Image id was filled in by the caller. Record the header
* object name for this rbd image. * object name for this rbd image.
*/ */
size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len; size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
rbd_dev->header_name = kmalloc(size, GFP_KERNEL); rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
if (!rbd_dev->header_name) if (!rbd_dev->header_name)
return -ENOMEM; return -ENOMEM;
...@@ -3542,7 +3569,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) ...@@ -3542,7 +3569,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
if (ret) if (ret)
goto err_out_bus; goto err_out_bus;
ret = rbd_init_watch_dev(rbd_dev); ret = rbd_req_sync_watch(rbd_dev, 1);
if (ret) if (ret)
goto err_out_bus; goto err_out_bus;
...@@ -3638,6 +3665,13 @@ static ssize_t rbd_add(struct bus_type *bus, ...@@ -3638,6 +3665,13 @@ static ssize_t rbd_add(struct bus_type *bus,
goto err_out_client; goto err_out_client;
spec->pool_id = (u64) rc; spec->pool_id = (u64) rc;
/* The ceph file layout needs to fit pool id in 32 bits */
if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
rc = -EIO;
goto err_out_client;
}
rbd_dev = rbd_dev_create(rbdc, spec); rbd_dev = rbd_dev_create(rbdc, spec);
if (!rbd_dev) if (!rbd_dev)
goto err_out_client; goto err_out_client;
...@@ -3698,8 +3732,7 @@ static void rbd_dev_release(struct device *dev) ...@@ -3698,8 +3732,7 @@ static void rbd_dev_release(struct device *dev)
rbd_dev->watch_request); rbd_dev->watch_request);
} }
if (rbd_dev->watch_event) if (rbd_dev->watch_event)
rbd_req_sync_unwatch(rbd_dev); rbd_req_sync_watch(rbd_dev, 0);
/* clean up and free blkdev */ /* clean up and free blkdev */
rbd_free_disk(rbd_dev); rbd_free_disk(rbd_dev);
......
...@@ -611,8 +611,16 @@ int ceph_add_cap(struct inode *inode, ...@@ -611,8 +611,16 @@ int ceph_add_cap(struct inode *inode,
if (flags & CEPH_CAP_FLAG_AUTH) if (flags & CEPH_CAP_FLAG_AUTH)
ci->i_auth_cap = cap; ci->i_auth_cap = cap;
else if (ci->i_auth_cap == cap) else if (ci->i_auth_cap == cap) {
ci->i_auth_cap = NULL; ci->i_auth_cap = NULL;
spin_lock(&mdsc->cap_dirty_lock);
if (!list_empty(&ci->i_dirty_item)) {
dout(" moving %p to cap_dirty_migrating\n", inode);
list_move(&ci->i_dirty_item,
&mdsc->cap_dirty_migrating);
}
spin_unlock(&mdsc->cap_dirty_lock);
}
dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
inode, ceph_vinop(inode), cap, ceph_cap_string(issued), inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
...@@ -1460,7 +1468,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1460,7 +1468,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_client *mdsc = fsc->mdsc;
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
struct ceph_cap *cap; struct ceph_cap *cap;
int file_wanted, used; int file_wanted, used, cap_used;
int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
int issued, implemented, want, retain, revoking, flushing = 0; int issued, implemented, want, retain, revoking, flushing = 0;
int mds = -1; /* keep track of how far we've gone through i_caps list int mds = -1; /* keep track of how far we've gone through i_caps list
...@@ -1563,9 +1571,14 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1563,9 +1571,14 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
/* NOTE: no side-effects allowed, until we take s_mutex */ /* NOTE: no side-effects allowed, until we take s_mutex */
cap_used = used;
if (ci->i_auth_cap && cap != ci->i_auth_cap)
cap_used &= ~ci->i_auth_cap->issued;
revoking = cap->implemented & ~cap->issued; revoking = cap->implemented & ~cap->issued;
dout(" mds%d cap %p issued %s implemented %s revoking %s\n", dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
cap->mds, cap, ceph_cap_string(cap->issued), cap->mds, cap, ceph_cap_string(cap->issued),
ceph_cap_string(cap_used),
ceph_cap_string(cap->implemented), ceph_cap_string(cap->implemented),
ceph_cap_string(revoking)); ceph_cap_string(revoking));
...@@ -1593,7 +1606,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1593,7 +1606,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
} }
/* completed revocation? going down and there are no caps? */ /* completed revocation? going down and there are no caps? */
if (revoking && (revoking & used) == 0) { if (revoking && (revoking & cap_used) == 0) {
dout("completed revocation of %s\n", dout("completed revocation of %s\n",
ceph_cap_string(cap->implemented & ~cap->issued)); ceph_cap_string(cap->implemented & ~cap->issued));
goto ack; goto ack;
...@@ -1670,8 +1683,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1670,8 +1683,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
sent++; sent++;
/* __send_cap drops i_ceph_lock */ /* __send_cap drops i_ceph_lock */
delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want, delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
retain, flushing, NULL); want, retain, flushing, NULL);
goto retry; /* retake i_ceph_lock and restart our cap scan. */ goto retry; /* retake i_ceph_lock and restart our cap scan. */
} }
...@@ -2416,7 +2429,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ...@@ -2416,7 +2429,9 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
dout("mds wanted %s -> %s\n", dout("mds wanted %s -> %s\n",
ceph_cap_string(le32_to_cpu(grant->wanted)), ceph_cap_string(le32_to_cpu(grant->wanted)),
ceph_cap_string(wanted)); ceph_cap_string(wanted));
grant->wanted = cpu_to_le32(wanted); /* imported cap may not have correct mds_wanted */
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
check_caps = 1;
} }
cap->seq = seq; cap->seq = seq;
...@@ -2820,6 +2835,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -2820,6 +2835,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
(unsigned)seq); (unsigned)seq);
if (op == CEPH_CAP_OP_IMPORT)
ceph_add_cap_releases(mdsc, session);
/* lookup ino */ /* lookup ino */
inode = ceph_find_inode(sb, vino); inode = ceph_find_inode(sb, vino);
ci = ceph_inode(inode); ci = ceph_inode(inode);
......
...@@ -243,6 +243,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, ...@@ -243,6 +243,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
err = ceph_mdsc_do_request(mdsc, err = ceph_mdsc_do_request(mdsc,
(flags & (O_CREAT|O_TRUNC)) ? dir : NULL, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
req); req);
if (err)
goto out_err;
err = ceph_handle_snapdir(req, dentry, err); err = ceph_handle_snapdir(req, dentry, err);
if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry); err = ceph_handle_notrace_create(dir, dentry);
...@@ -263,6 +266,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, ...@@ -263,6 +266,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
err = finish_no_open(file, dn); err = finish_no_open(file, dn);
} else { } else {
dout("atomic_open finish_open on dn %p\n", dn); dout("atomic_open finish_open on dn %p\n", dn);
if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
*opened |= FILE_CREATED;
}
err = finish_open(file, dentry, ceph_open, opened); err = finish_open(file, dentry, ceph_open, opened);
} }
......
...@@ -194,7 +194,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) ...@@ -194,7 +194,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
return -EFAULT; return -EFAULT;
down_read(&osdc->map_sem); down_read(&osdc->map_sem);
r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, &len, r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
&dl.object_no, &dl.object_offset, &dl.object_no, &dl.object_offset,
&olen); &olen);
if (r < 0) if (r < 0)
......
...@@ -232,6 +232,30 @@ static int parse_reply_info_filelock(void **p, void *end, ...@@ -232,6 +232,30 @@ static int parse_reply_info_filelock(void **p, void *end,
return -EIO; return -EIO;
} }
/*
* parse create results
*/
static int parse_reply_info_create(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
int features)
{
if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
if (*p == end) {
info->has_create_ino = false;
} else {
info->has_create_ino = true;
info->ino = ceph_decode_64(p);
}
}
if (unlikely(*p != end))
goto bad;
return 0;
bad:
return -EIO;
}
/* /*
* parse extra results * parse extra results
*/ */
...@@ -241,8 +265,12 @@ static int parse_reply_info_extra(void **p, void *end, ...@@ -241,8 +265,12 @@ static int parse_reply_info_extra(void **p, void *end,
{ {
if (info->head->op == CEPH_MDS_OP_GETFILELOCK) if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
return parse_reply_info_filelock(p, end, info, features); return parse_reply_info_filelock(p, end, info, features);
else else if (info->head->op == CEPH_MDS_OP_READDIR)
return parse_reply_info_dir(p, end, info, features); return parse_reply_info_dir(p, end, info, features);
else if (info->head->op == CEPH_MDS_OP_CREATE)
return parse_reply_info_create(p, end, info, features);
else
return -EIO;
} }
/* /*
...@@ -2170,7 +2198,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -2170,7 +2198,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_lock(&req->r_fill_mutex); mutex_lock(&req->r_fill_mutex);
err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
if (err == 0) { if (err == 0) {
if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK && if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
req->r_op == CEPH_MDS_OP_LSSNAP) &&
rinfo->dir_nr) rinfo->dir_nr)
ceph_readdir_prepopulate(req, req->r_session); ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(mdsc, &req->r_caps_reservation); ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
......
...@@ -74,6 +74,12 @@ struct ceph_mds_reply_info_parsed { ...@@ -74,6 +74,12 @@ struct ceph_mds_reply_info_parsed {
struct ceph_mds_reply_info_in *dir_in; struct ceph_mds_reply_info_in *dir_in;
u8 dir_complete, dir_end; u8 dir_complete, dir_end;
}; };
/* for create results */
struct {
bool has_create_ino;
u64 ino;
};
}; };
/* encoded blob describing snapshot contexts for certain /* encoded blob describing snapshot contexts for certain
......
...@@ -14,13 +14,19 @@ ...@@ -14,13 +14,19 @@
#define CEPH_FEATURE_DIRLAYOUTHASH (1<<7) #define CEPH_FEATURE_DIRLAYOUTHASH (1<<7)
/* bits 8-17 defined by user-space; not supported yet here */ /* bits 8-17 defined by user-space; not supported yet here */
#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18) #define CEPH_FEATURE_CRUSH_TUNABLES (1<<18)
/* bits 19-24 defined by user-space; not supported yet here */
#define CEPH_FEATURE_CRUSH_TUNABLES2 (1<<25)
/* bit 26 defined by user-space; not supported yet here */
#define CEPH_FEATURE_REPLY_CREATE_INODE (1<<27)
/* /*
* Features supported. * Features supported.
*/ */
#define CEPH_FEATURES_SUPPORTED_DEFAULT \ #define CEPH_FEATURES_SUPPORTED_DEFAULT \
(CEPH_FEATURE_NOSRCADDR | \ (CEPH_FEATURE_NOSRCADDR | \
CEPH_FEATURE_CRUSH_TUNABLES) CEPH_FEATURE_CRUSH_TUNABLES | \
CEPH_FEATURE_CRUSH_TUNABLES2 | \
CEPH_FEATURE_REPLY_CREATE_INODE)
#define CEPH_FEATURES_REQUIRED_DEFAULT \ #define CEPH_FEATURES_REQUIRED_DEFAULT \
(CEPH_FEATURE_NOSRCADDR) (CEPH_FEATURE_NOSRCADDR)
......
...@@ -99,8 +99,8 @@ static inline int ceph_has_room(void **p, void *end, size_t n) ...@@ -99,8 +99,8 @@ static inline int ceph_has_room(void **p, void *end, size_t n)
* *
* There are two possible failures: * There are two possible failures:
* - converting the string would require accessing memory at or * - converting the string would require accessing memory at or
* beyond the "end" pointer provided (-E * beyond the "end" pointer provided (-ERANGE)
* - memory could not be allocated for the result * - memory could not be allocated for the result (-ENOMEM)
*/ */
static inline char *ceph_extract_encoded_string(void **p, void *end, static inline char *ceph_extract_encoded_string(void **p, void *end,
size_t *lenp, gfp_t gfp) size_t *lenp, gfp_t gfp)
...@@ -238,6 +238,11 @@ static inline void ceph_encode_string(void **p, void *end, ...@@ -238,6 +238,11 @@ static inline void ceph_encode_string(void **p, void *end,
ceph_encode_need(p, end, sizeof(u16), bad); \ ceph_encode_need(p, end, sizeof(u16), bad); \
ceph_encode_16(p, v); \ ceph_encode_16(p, v); \
} while (0) } while (0)
#define ceph_encode_8_safe(p, end, v, bad) \
do { \
ceph_encode_need(p, end, sizeof(u8), bad); \
ceph_encode_8(p, v); \
} while (0)
#define ceph_encode_copy_safe(p, end, pv, n, bad) \ #define ceph_encode_copy_safe(p, end, pv, n, bad) \
do { \ do { \
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <linux/ceph/osdmap.h> #include <linux/ceph/osdmap.h>
#include <linux/ceph/messenger.h> #include <linux/ceph/messenger.h>
#include <linux/ceph/auth.h> #include <linux/ceph/auth.h>
#include <linux/ceph/pagelist.h>
/* /*
* Maximum object name size * Maximum object name size
...@@ -22,7 +23,6 @@ struct ceph_snap_context; ...@@ -22,7 +23,6 @@ struct ceph_snap_context;
struct ceph_osd_request; struct ceph_osd_request;
struct ceph_osd_client; struct ceph_osd_client;
struct ceph_authorizer; struct ceph_authorizer;
struct ceph_pagelist;
/* /*
* completion callback for async writepages * completion callback for async writepages
...@@ -95,7 +95,7 @@ struct ceph_osd_request { ...@@ -95,7 +95,7 @@ struct ceph_osd_request {
struct bio *r_bio; /* instead of pages */ struct bio *r_bio; /* instead of pages */
#endif #endif
struct ceph_pagelist *r_trail; /* trailing part of the data */ struct ceph_pagelist r_trail; /* trailing part of the data */
}; };
struct ceph_osd_event { struct ceph_osd_event {
...@@ -157,7 +157,6 @@ struct ceph_osd_client { ...@@ -157,7 +157,6 @@ struct ceph_osd_client {
struct ceph_osd_req_op { struct ceph_osd_req_op {
u16 op; /* CEPH_OSD_OP_* */ u16 op; /* CEPH_OSD_OP_* */
u32 flags; /* CEPH_OSD_FLAG_* */
union { union {
struct { struct {
u64 offset, length; u64 offset, length;
...@@ -207,29 +206,24 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, ...@@ -207,29 +206,24 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
struct ceph_msg *msg); struct ceph_msg *msg);
extern int ceph_calc_raw_layout(struct ceph_osd_client *osdc, extern int ceph_calc_raw_layout(struct ceph_file_layout *layout,
struct ceph_file_layout *layout,
u64 snapid,
u64 off, u64 *plen, u64 *bno, u64 off, u64 *plen, u64 *bno,
struct ceph_osd_request *req, struct ceph_osd_request *req,
struct ceph_osd_req_op *op); struct ceph_osd_req_op *op);
extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
int flags,
struct ceph_snap_context *snapc, struct ceph_snap_context *snapc,
struct ceph_osd_req_op *ops, unsigned int num_op,
bool use_mempool, bool use_mempool,
gfp_t gfp_flags, gfp_t gfp_flags);
struct page **pages,
struct bio *bio);
extern void ceph_osdc_build_request(struct ceph_osd_request *req, extern void ceph_osdc_build_request(struct ceph_osd_request *req,
u64 off, u64 *plen, u64 off, u64 len,
unsigned int num_op,
struct ceph_osd_req_op *src_ops, struct ceph_osd_req_op *src_ops,
struct ceph_snap_context *snapc, struct ceph_snap_context *snapc,
struct timespec *mtime, u64 snap_id,
const char *oid, struct timespec *mtime);
int oid_len);
extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
struct ceph_file_layout *layout, struct ceph_file_layout *layout,
......
...@@ -110,7 +110,7 @@ extern void ceph_osdmap_destroy(struct ceph_osdmap *map); ...@@ -110,7 +110,7 @@ extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
/* calculate mapping of a file extent to an object */ /* calculate mapping of a file extent to an object */
extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, extern int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u64 off, u64 *plen, u64 off, u64 len,
u64 *bno, u64 *oxoff, u64 *oxlen); u64 *bno, u64 *oxoff, u64 *oxlen);
/* calculate mapping of object to a placement group */ /* calculate mapping of object to a placement group */
......
...@@ -162,6 +162,8 @@ struct crush_map { ...@@ -162,6 +162,8 @@ struct crush_map {
__u32 choose_local_fallback_tries; __u32 choose_local_fallback_tries;
/* choose attempts before giving up */ /* choose attempts before giving up */
__u32 choose_total_tries; __u32 choose_total_tries;
/* attempt chooseleaf inner descent once; on failure retry outer descent */
__u32 chooseleaf_descend_once;
}; };
......
...@@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in ...@@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
* @outpos: our position in that vector * @outpos: our position in that vector
* @firstn: true if choosing "first n" items, false if choosing "indep" * @firstn: true if choosing "first n" items, false if choosing "indep"
* @recurse_to_leaf: true if we want one device under each item of given type * @recurse_to_leaf: true if we want one device under each item of given type
* @descend_once: true if we should only try one descent before giving up
* @out2: second output vector for leaf items (if @recurse_to_leaf) * @out2: second output vector for leaf items (if @recurse_to_leaf)
*/ */
static int crush_choose(const struct crush_map *map, static int crush_choose(const struct crush_map *map,
...@@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map, ...@@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map,
int x, int numrep, int type, int x, int numrep, int type,
int *out, int outpos, int *out, int outpos,
int firstn, int recurse_to_leaf, int firstn, int recurse_to_leaf,
int *out2) int descend_once, int *out2)
{ {
int rep; int rep;
unsigned int ftotal, flocal; unsigned int ftotal, flocal;
...@@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map, ...@@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map,
} }
reject = 0; reject = 0;
if (recurse_to_leaf) { if (!collide && recurse_to_leaf) {
if (item < 0) { if (item < 0) {
if (crush_choose(map, if (crush_choose(map,
map->buckets[-1-item], map->buckets[-1-item],
...@@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map, ...@@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map,
x, outpos+1, 0, x, outpos+1, 0,
out2, outpos, out2, outpos,
firstn, 0, firstn, 0,
map->chooseleaf_descend_once,
NULL) <= outpos) NULL) <= outpos)
/* didn't get leaf */ /* didn't get leaf */
reject = 1; reject = 1;
...@@ -422,7 +424,10 @@ static int crush_choose(const struct crush_map *map, ...@@ -422,7 +424,10 @@ static int crush_choose(const struct crush_map *map,
ftotal++; ftotal++;
flocal++; flocal++;
if (collide && flocal <= map->choose_local_tries) if (reject && descend_once)
/* let outer call try again */
skip_rep = 1;
else if (collide && flocal <= map->choose_local_tries)
/* retry locally a few times */ /* retry locally a few times */
retry_bucket = 1; retry_bucket = 1;
else if (map->choose_local_fallback_tries > 0 && else if (map->choose_local_fallback_tries > 0 &&
...@@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map, ...@@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map,
int i, j; int i, j;
int numrep; int numrep;
int firstn; int firstn;
const int descend_once = 0;
if ((__u32)ruleno >= map->max_rules) { if ((__u32)ruleno >= map->max_rules) {
dprintk(" bad ruleno %d\n", ruleno); dprintk(" bad ruleno %d\n", ruleno);
...@@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map, ...@@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map,
curstep->arg2, curstep->arg2,
o+osize, j, o+osize, j,
firstn, firstn,
recurse_to_leaf, c+osize); recurse_to_leaf,
descend_once, c+osize);
} }
if (recurse_to_leaf) if (recurse_to_leaf)
......
...@@ -32,52 +32,43 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc, ...@@ -32,52 +32,43 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
static void __send_request(struct ceph_osd_client *osdc, static void __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req); struct ceph_osd_request *req);
static int op_needs_trail(int op)
{
switch (op) {
case CEPH_OSD_OP_GETXATTR:
case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR:
case CEPH_OSD_OP_CALL:
case CEPH_OSD_OP_NOTIFY:
return 1;
default:
return 0;
}
}
static int op_has_extent(int op) static int op_has_extent(int op)
{ {
return (op == CEPH_OSD_OP_READ || return (op == CEPH_OSD_OP_READ ||
op == CEPH_OSD_OP_WRITE); op == CEPH_OSD_OP_WRITE);
} }
int ceph_calc_raw_layout(struct ceph_osd_client *osdc, int ceph_calc_raw_layout(struct ceph_file_layout *layout,
struct ceph_file_layout *layout,
u64 snapid,
u64 off, u64 *plen, u64 *bno, u64 off, u64 *plen, u64 *bno,
struct ceph_osd_request *req, struct ceph_osd_request *req,
struct ceph_osd_req_op *op) struct ceph_osd_req_op *op)
{ {
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
u64 orig_len = *plen; u64 orig_len = *plen;
u64 objoff, objlen; /* extent in object */ u64 objoff, objlen; /* extent in object */
int r; int r;
reqhead->snapid = cpu_to_le64(snapid);
/* object extent? */ /* object extent? */
r = ceph_calc_file_object_mapping(layout, off, plen, bno, r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
&objoff, &objlen); &objoff, &objlen);
if (r < 0) if (r < 0)
return r; return r;
if (*plen < orig_len) if (objlen < orig_len) {
*plen = objlen;
dout(" skipping last %llu, final file extent %llu~%llu\n", dout(" skipping last %llu, final file extent %llu~%llu\n",
orig_len - *plen, off, *plen); orig_len - *plen, off, *plen);
}
if (op_has_extent(op->op)) { if (op_has_extent(op->op)) {
u32 osize = le32_to_cpu(layout->fl_object_size);
op->extent.offset = objoff; op->extent.offset = objoff;
op->extent.length = objlen; op->extent.length = objlen;
if (op->extent.truncate_size <= off - objoff) {
op->extent.truncate_size = 0;
} else {
op->extent.truncate_size -= off - objoff;
if (op->extent.truncate_size > osize)
op->extent.truncate_size = osize;
}
} }
req->r_num_pages = calc_pages_for(off, *plen); req->r_num_pages = calc_pages_for(off, *plen);
req->r_page_alignment = off & ~PAGE_MASK; req->r_page_alignment = off & ~PAGE_MASK;
...@@ -115,8 +106,7 @@ EXPORT_SYMBOL(ceph_calc_raw_layout); ...@@ -115,8 +106,7 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
* *
* fill osd op in request message. * fill osd op in request message.
*/ */
static int calc_layout(struct ceph_osd_client *osdc, static int calc_layout(struct ceph_vino vino,
struct ceph_vino vino,
struct ceph_file_layout *layout, struct ceph_file_layout *layout,
u64 off, u64 *plen, u64 off, u64 *plen,
struct ceph_osd_request *req, struct ceph_osd_request *req,
...@@ -125,8 +115,7 @@ static int calc_layout(struct ceph_osd_client *osdc, ...@@ -125,8 +115,7 @@ static int calc_layout(struct ceph_osd_client *osdc,
u64 bno; u64 bno;
int r; int r;
r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, r = ceph_calc_raw_layout(layout, off, plen, &bno, req, op);
plen, &bno, req, op);
if (r < 0) if (r < 0)
return r; return r;
...@@ -163,10 +152,7 @@ void ceph_osdc_release_request(struct kref *kref) ...@@ -163,10 +152,7 @@ void ceph_osdc_release_request(struct kref *kref)
bio_put(req->r_bio); bio_put(req->r_bio);
#endif #endif
ceph_put_snap_context(req->r_snapc); ceph_put_snap_context(req->r_snapc);
if (req->r_trail) { ceph_pagelist_release(&req->r_trail);
ceph_pagelist_release(req->r_trail);
kfree(req->r_trail);
}
if (req->r_mempool) if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool); mempool_free(req, req->r_osdc->req_mempool);
else else
...@@ -174,34 +160,14 @@ void ceph_osdc_release_request(struct kref *kref) ...@@ -174,34 +160,14 @@ void ceph_osdc_release_request(struct kref *kref)
} }
EXPORT_SYMBOL(ceph_osdc_release_request); EXPORT_SYMBOL(ceph_osdc_release_request);
static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
{
int i = 0;
if (needs_trail)
*needs_trail = 0;
while (ops[i].op) {
if (needs_trail && op_needs_trail(ops[i].op))
*needs_trail = 1;
i++;
}
return i;
}
struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
int flags,
struct ceph_snap_context *snapc, struct ceph_snap_context *snapc,
struct ceph_osd_req_op *ops, unsigned int num_op,
bool use_mempool, bool use_mempool,
gfp_t gfp_flags, gfp_t gfp_flags)
struct page **pages,
struct bio *bio)
{ {
struct ceph_osd_request *req; struct ceph_osd_request *req;
struct ceph_msg *msg; struct ceph_msg *msg;
int needs_trail;
int num_op = get_num_ops(ops, &needs_trail);
size_t msg_size = sizeof(struct ceph_osd_request_head); size_t msg_size = sizeof(struct ceph_osd_request_head);
msg_size += num_op*sizeof(struct ceph_osd_op); msg_size += num_op*sizeof(struct ceph_osd_op);
...@@ -228,10 +194,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -228,10 +194,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_req_lru_item);
INIT_LIST_HEAD(&req->r_osd_item); INIT_LIST_HEAD(&req->r_osd_item);
req->r_flags = flags;
WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
/* create reply message */ /* create reply message */
if (use_mempool) if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
...@@ -244,15 +206,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -244,15 +206,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
} }
req->r_reply = msg; req->r_reply = msg;
/* allocate space for the trailing data */ ceph_pagelist_init(&req->r_trail);
if (needs_trail) {
req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
if (!req->r_trail) {
ceph_osdc_put_request(req);
return NULL;
}
ceph_pagelist_init(req->r_trail);
}
/* create request message; allow space for oid */ /* create request message; allow space for oid */
msg_size += MAX_OBJ_NAME_SIZE; msg_size += MAX_OBJ_NAME_SIZE;
...@@ -270,13 +224,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -270,13 +224,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
memset(msg->front.iov_base, 0, msg->front.iov_len); memset(msg->front.iov_base, 0, msg->front.iov_len);
req->r_request = msg; req->r_request = msg;
req->r_pages = pages;
#ifdef CONFIG_BLOCK
if (bio) {
req->r_bio = bio;
bio_get(req->r_bio);
}
#endif
return req; return req;
} }
...@@ -304,29 +251,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req, ...@@ -304,29 +251,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
case CEPH_OSD_OP_GETXATTR: case CEPH_OSD_OP_GETXATTR:
case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_CMPXATTR:
BUG_ON(!req->r_trail);
dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
dst->xattr.cmp_op = src->xattr.cmp_op; dst->xattr.cmp_op = src->xattr.cmp_op;
dst->xattr.cmp_mode = src->xattr.cmp_mode; dst->xattr.cmp_mode = src->xattr.cmp_mode;
ceph_pagelist_append(req->r_trail, src->xattr.name, ceph_pagelist_append(&req->r_trail, src->xattr.name,
src->xattr.name_len); src->xattr.name_len);
ceph_pagelist_append(req->r_trail, src->xattr.val, ceph_pagelist_append(&req->r_trail, src->xattr.val,
src->xattr.value_len); src->xattr.value_len);
break; break;
case CEPH_OSD_OP_CALL: case CEPH_OSD_OP_CALL:
BUG_ON(!req->r_trail);
dst->cls.class_len = src->cls.class_len; dst->cls.class_len = src->cls.class_len;
dst->cls.method_len = src->cls.method_len; dst->cls.method_len = src->cls.method_len;
dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
ceph_pagelist_append(req->r_trail, src->cls.class_name, ceph_pagelist_append(&req->r_trail, src->cls.class_name,
src->cls.class_len); src->cls.class_len);
ceph_pagelist_append(req->r_trail, src->cls.method_name, ceph_pagelist_append(&req->r_trail, src->cls.method_name,
src->cls.method_len); src->cls.method_len);
ceph_pagelist_append(req->r_trail, src->cls.indata, ceph_pagelist_append(&req->r_trail, src->cls.indata,
src->cls.indata_len); src->cls.indata_len);
break; break;
case CEPH_OSD_OP_ROLLBACK: case CEPH_OSD_OP_ROLLBACK:
...@@ -339,11 +282,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req, ...@@ -339,11 +282,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
__le32 prot_ver = cpu_to_le32(src->watch.prot_ver); __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
__le32 timeout = cpu_to_le32(src->watch.timeout); __le32 timeout = cpu_to_le32(src->watch.timeout);
BUG_ON(!req->r_trail); ceph_pagelist_append(&req->r_trail,
ceph_pagelist_append(req->r_trail,
&prot_ver, sizeof(prot_ver)); &prot_ver, sizeof(prot_ver));
ceph_pagelist_append(req->r_trail, ceph_pagelist_append(&req->r_trail,
&timeout, sizeof(timeout)); &timeout, sizeof(timeout));
} }
case CEPH_OSD_OP_NOTIFY_ACK: case CEPH_OSD_OP_NOTIFY_ACK:
...@@ -365,25 +306,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req, ...@@ -365,25 +306,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
* *
*/ */
void ceph_osdc_build_request(struct ceph_osd_request *req, void ceph_osdc_build_request(struct ceph_osd_request *req,
u64 off, u64 *plen, u64 off, u64 len, unsigned int num_op,
struct ceph_osd_req_op *src_ops, struct ceph_osd_req_op *src_ops,
struct ceph_snap_context *snapc, struct ceph_snap_context *snapc, u64 snap_id,
struct timespec *mtime, struct timespec *mtime)
const char *oid,
int oid_len)
{ {
struct ceph_msg *msg = req->r_request; struct ceph_msg *msg = req->r_request;
struct ceph_osd_request_head *head; struct ceph_osd_request_head *head;
struct ceph_osd_req_op *src_op; struct ceph_osd_req_op *src_op;
struct ceph_osd_op *op; struct ceph_osd_op *op;
void *p; void *p;
int num_op = get_num_ops(src_ops, NULL);
size_t msg_size = sizeof(*head) + num_op*sizeof(*op); size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
int flags = req->r_flags; int flags = req->r_flags;
u64 data_len = 0; u64 data_len = 0;
int i; int i;
WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
head = msg->front.iov_base; head = msg->front.iov_base;
head->snapid = cpu_to_le64(snap_id);
op = (void *)(head + 1); op = (void *)(head + 1);
p = (void *)(op + num_op); p = (void *)(op + num_op);
...@@ -393,23 +334,19 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, ...@@ -393,23 +334,19 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
head->flags = cpu_to_le32(flags); head->flags = cpu_to_le32(flags);
if (flags & CEPH_OSD_FLAG_WRITE) if (flags & CEPH_OSD_FLAG_WRITE)
ceph_encode_timespec(&head->mtime, mtime); ceph_encode_timespec(&head->mtime, mtime);
BUG_ON(num_op > (unsigned int) ((u16) -1));
head->num_ops = cpu_to_le16(num_op); head->num_ops = cpu_to_le16(num_op);
/* fill in oid */ /* fill in oid */
head->object_len = cpu_to_le32(oid_len); head->object_len = cpu_to_le32(req->r_oid_len);
memcpy(p, oid, oid_len); memcpy(p, req->r_oid, req->r_oid_len);
p += oid_len; p += req->r_oid_len;
src_op = src_ops; src_op = src_ops;
while (src_op->op) { while (num_op--)
osd_req_encode_op(req, op, src_op); osd_req_encode_op(req, op++, src_op++);
src_op++;
op++;
}
if (req->r_trail) data_len += req->r_trail.length;
data_len += req->r_trail->length;
if (snapc) { if (snapc) {
head->snap_seq = cpu_to_le64(snapc->seq); head->snap_seq = cpu_to_le64(snapc->seq);
...@@ -422,7 +359,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, ...@@ -422,7 +359,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
if (flags & CEPH_OSD_FLAG_WRITE) { if (flags & CEPH_OSD_FLAG_WRITE) {
req->r_request->hdr.data_off = cpu_to_le16(off); req->r_request->hdr.data_off = cpu_to_le16(off);
req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); req->r_request->hdr.data_len = cpu_to_le32(len + data_len);
} else if (data_len) { } else if (data_len) {
req->r_request->hdr.data_off = 0; req->r_request->hdr.data_off = 0;
req->r_request->hdr.data_len = cpu_to_le32(data_len); req->r_request->hdr.data_len = cpu_to_le32(data_len);
...@@ -462,31 +399,30 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, ...@@ -462,31 +399,30 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
bool use_mempool, int num_reply, bool use_mempool, int num_reply,
int page_align) int page_align)
{ {
struct ceph_osd_req_op ops[3]; struct ceph_osd_req_op ops[2];
struct ceph_osd_request *req; struct ceph_osd_request *req;
unsigned int num_op = 1;
int r; int r;
memset(&ops, 0, sizeof ops);
ops[0].op = opcode; ops[0].op = opcode;
ops[0].extent.truncate_seq = truncate_seq; ops[0].extent.truncate_seq = truncate_seq;
ops[0].extent.truncate_size = truncate_size; ops[0].extent.truncate_size = truncate_size;
ops[0].payload_len = 0;
if (do_sync) { if (do_sync) {
ops[1].op = CEPH_OSD_OP_STARTSYNC; ops[1].op = CEPH_OSD_OP_STARTSYNC;
ops[1].payload_len = 0; num_op++;
ops[2].op = 0; }
} else
ops[1].op = 0; req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
GFP_NOFS);
req = ceph_osdc_alloc_request(osdc, flags,
snapc, ops,
use_mempool,
GFP_NOFS, NULL, NULL);
if (!req) if (!req)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
req->r_flags = flags;
/* calculate max write size */ /* calculate max write size */
r = calc_layout(osdc, vino, layout, off, plen, req, ops); r = calc_layout(vino, layout, off, plen, req, ops);
if (r < 0) if (r < 0)
return ERR_PTR(r); return ERR_PTR(r);
req->r_file_layout = *layout; /* keep a copy */ req->r_file_layout = *layout; /* keep a copy */
...@@ -496,10 +432,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, ...@@ -496,10 +432,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
req->r_num_pages = calc_pages_for(page_align, *plen); req->r_num_pages = calc_pages_for(page_align, *plen);
req->r_page_alignment = page_align; req->r_page_alignment = page_align;
ceph_osdc_build_request(req, off, plen, ops, ceph_osdc_build_request(req, off, *plen, num_op, ops,
snapc, snapc, vino.snap, mtime);
mtime,
req->r_oid, req->r_oid_len);
return req; return req;
} }
...@@ -739,31 +673,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc) ...@@ -739,31 +673,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
*/ */
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{ {
struct ceph_osd_request *req; struct ceph_entity_addr *peer_addr;
int ret = 0;
dout("__reset_osd %p osd%d\n", osd, osd->o_osd); dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests) && if (list_empty(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) { list_empty(&osd->o_linger_requests)) {
__remove_osd(osdc, osd); __remove_osd(osdc, osd);
ret = -ENODEV;
} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], return -ENODEV;
&osd->o_con.peer_addr, }
sizeof(osd->o_con.peer_addr)) == 0 &&
peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
!ceph_con_opened(&osd->o_con)) { !ceph_con_opened(&osd->o_con)) {
struct ceph_osd_request *req;
dout(" osd addr hasn't changed and connection never opened," dout(" osd addr hasn't changed and connection never opened,"
" letting msgr retry"); " letting msgr retry");
/* touch each r_stamp for handle_timeout()'s benfit */ /* touch each r_stamp for handle_timeout()'s benfit */
list_for_each_entry(req, &osd->o_requests, r_osd_item) list_for_each_entry(req, &osd->o_requests, r_osd_item)
req->r_stamp = jiffies; req->r_stamp = jiffies;
ret = -EAGAIN;
} else { return -EAGAIN;
}
ceph_con_close(&osd->o_con); ceph_con_close(&osd->o_con);
ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
&osdc->osdmap->osd_addr[osd->o_osd]);
osd->o_incarnation++; osd->o_incarnation++;
}
return ret; return 0;
} }
static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
...@@ -1706,7 +1644,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, ...@@ -1706,7 +1644,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
req->r_request->bio = req->r_bio; req->r_request->bio = req->r_bio;
#endif #endif
req->r_request->trail = req->r_trail; req->r_request->trail = &req->r_trail;
register_request(osdc, req); register_request(osdc, req);
......
...@@ -13,26 +13,18 @@ ...@@ -13,26 +13,18 @@
char *ceph_osdmap_state_str(char *str, int len, int state) char *ceph_osdmap_state_str(char *str, int len, int state)
{ {
int flag = 0;
if (!len) if (!len)
goto done; return str;
*str = '\0'; if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
if (state) { snprintf(str, len, "exists, up");
if (state & CEPH_OSD_EXISTS) { else if (state & CEPH_OSD_EXISTS)
snprintf(str, len, "exists"); snprintf(str, len, "exists");
flag = 1; else if (state & CEPH_OSD_UP)
} snprintf(str, len, "up");
if (state & CEPH_OSD_UP) { else
snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""),
"up");
flag = 1;
}
} else {
snprintf(str, len, "doesn't exist"); snprintf(str, len, "doesn't exist");
}
done:
return str; return str;
} }
...@@ -170,6 +162,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end) ...@@ -170,6 +162,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
c->choose_local_tries = 2; c->choose_local_tries = 2;
c->choose_local_fallback_tries = 5; c->choose_local_fallback_tries = 5;
c->choose_total_tries = 19; c->choose_total_tries = 19;
c->chooseleaf_descend_once = 0;
ceph_decode_need(p, end, 4*sizeof(u32), bad); ceph_decode_need(p, end, 4*sizeof(u32), bad);
magic = ceph_decode_32(p); magic = ceph_decode_32(p);
...@@ -336,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end) ...@@ -336,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
dout("crush decode tunable choose_total_tries = %d", dout("crush decode tunable choose_total_tries = %d",
c->choose_total_tries); c->choose_total_tries);
ceph_decode_need(p, end, sizeof(u32), done);
c->chooseleaf_descend_once = ceph_decode_32(p);
dout("crush decode tunable chooseleaf_descend_once = %d",
c->chooseleaf_descend_once);
done: done:
dout("crush_decode success\n"); dout("crush_decode success\n");
return c; return c;
...@@ -1010,7 +1008,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ...@@ -1010,7 +1008,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
* pass a stride back to the caller. * pass a stride back to the caller.
*/ */
int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u64 off, u64 *plen, u64 off, u64 len,
u64 *ono, u64 *ono,
u64 *oxoff, u64 *oxlen) u64 *oxoff, u64 *oxlen)
{ {
...@@ -1021,7 +1019,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, ...@@ -1021,7 +1019,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u32 su_per_object; u32 su_per_object;
u64 t, su_offset; u64 t, su_offset;
dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen, dout("mapping %llu~%llu osize %u fl_su %u\n", off, len,
osize, su); osize, su);
if (su == 0 || sc == 0) if (su == 0 || sc == 0)
goto invalid; goto invalid;
...@@ -1054,11 +1052,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, ...@@ -1054,11 +1052,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
/* /*
* Calculate the length of the extent being written to the selected * Calculate the length of the extent being written to the selected
* object. This is the minimum of the full length requested (plen) or * object. This is the minimum of the full length requested (len) or
* the remainder of the current stripe being written to. * the remainder of the current stripe being written to.
*/ */
*oxlen = min_t(u64, *plen, su - su_offset); *oxlen = min_t(u64, len, su - su_offset);
*plen = *oxlen;
dout(" obj extent %llu~%llu\n", *oxoff, *oxlen); dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment