Commit c5b5ef6c authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

rbd: issue stat request before layered write

This is a step toward fully implementing layered writes.

Add checks before request submission for the object(s) associated
with an image request.  For write requests, if we don't know that
the target object exists, issue a STAT request to find out.  When
that request completes, mark the known and exists flags for the
original object request accordingly and re-submit the object
request.  (Note that this still does the existence check only; the
copyup operation is not yet done.)

A new object request is created to perform the existence check.  A
pointer to the original request is added to that object request to
allow the stat request to re-issue the original request after
updating its flags.  If there is a failure with the stat request
the error code is stored with the original request, which is then
completed.

This resolves:
    http://tracker.ceph.com/issues/3418Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent 5679c59f
...@@ -183,9 +183,31 @@ struct rbd_obj_request { ...@@ -183,9 +183,31 @@ struct rbd_obj_request {
u64 length; /* bytes from offset */ u64 length; /* bytes from offset */
unsigned long flags; unsigned long flags;
struct rbd_img_request *img_request; /*
u64 img_offset; /* image relative offset */ * An object request associated with an image will have its
struct list_head links; /* img_request->obj_requests */ * img_data flag set; a standalone object request will not.
*
* A standalone object request will have which == BAD_WHICH
* and a null obj_request pointer.
*
* An object request initiated in support of a layered image
* object (to check for its existence before a write) will
* have which == BAD_WHICH and a non-null obj_request pointer.
*
* Finally, an object request for rbd image data will have
* which != BAD_WHICH, and will have a non-null img_request
* pointer. The value of which will be in the range
* 0..(img_request->obj_request_count-1).
*/
union {
struct rbd_obj_request *obj_request; /* STAT op */
struct {
struct rbd_img_request *img_request;
u64 img_offset;
/* links for img_request->obj_requests list */
struct list_head links;
};
};
u32 which; /* posn image request list */ u32 which; /* posn image request list */
enum obj_request_type type; enum obj_request_type type;
...@@ -1656,10 +1678,6 @@ static struct rbd_img_request *rbd_img_request_create( ...@@ -1656,10 +1678,6 @@ static struct rbd_img_request *rbd_img_request_create(
INIT_LIST_HEAD(&img_request->obj_requests); INIT_LIST_HEAD(&img_request->obj_requests);
kref_init(&img_request->kref); kref_init(&img_request->kref);
(void) obj_request_existence_set;
(void) obj_request_known_test;
(void) obj_request_exists_test;
rbd_img_request_get(img_request); /* Avoid a warning */ rbd_img_request_get(img_request); /* Avoid a warning */
rbd_img_request_put(img_request); /* TEMPORARY */ rbd_img_request_put(img_request); /* TEMPORARY */
...@@ -1847,18 +1865,147 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, ...@@ -1847,18 +1865,147 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
return -ENOMEM; return -ENOMEM;
} }
static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
{
struct rbd_device *rbd_dev;
struct ceph_osd_client *osdc;
struct rbd_obj_request *orig_request;
int result;
rbd_assert(!obj_request_img_data_test(obj_request));
/*
* All we need from the object request is the original
* request and the result of the STAT op. Grab those, then
* we're done with the request.
*/
orig_request = obj_request->obj_request;
obj_request->obj_request = NULL;
rbd_assert(orig_request);
rbd_assert(orig_request->img_request);
result = obj_request->result;
obj_request->result = 0;
dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
obj_request, orig_request, result,
obj_request->xferred, obj_request->length);
rbd_obj_request_put(obj_request);
rbd_assert(orig_request);
rbd_assert(orig_request->img_request);
rbd_dev = orig_request->img_request->rbd_dev;
osdc = &rbd_dev->rbd_client->client->osdc;
/*
* Our only purpose here is to determine whether the object
* exists, and we don't want to treat the non-existence as
* an error. If something else comes back, transfer the
* error to the original request and complete it now.
*/
if (!result) {
obj_request_existence_set(orig_request, true);
} else if (result == -ENOENT) {
obj_request_existence_set(orig_request, false);
} else if (result) {
orig_request->result = result;
goto out_err;
}
/*
* Resubmit the original request now that we have recorded
* whether the target object exists.
*/
orig_request->result = rbd_obj_request_submit(osdc, orig_request);
out_err:
if (orig_request->result)
rbd_obj_request_complete(orig_request);
rbd_obj_request_put(orig_request);
}
static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
{
struct rbd_obj_request *stat_request;
struct rbd_device *rbd_dev;
struct ceph_osd_client *osdc;
struct page **pages = NULL;
u32 page_count;
size_t size;
int ret;
/*
* The response data for a STAT call consists of:
* le64 length;
* struct {
* le32 tv_sec;
* le32 tv_nsec;
* } mtime;
*/
size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
page_count = (u32)calc_pages_for(0, size);
pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = -ENOMEM;
stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
OBJ_REQUEST_PAGES);
if (!stat_request)
goto out;
rbd_obj_request_get(obj_request);
stat_request->obj_request = obj_request;
stat_request->pages = pages;
stat_request->page_count = page_count;
rbd_assert(obj_request->img_request);
rbd_dev = obj_request->img_request->rbd_dev;
stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
stat_request);
if (!stat_request->osd_req)
goto out;
stat_request->callback = rbd_img_obj_exists_callback;
osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
false, false);
rbd_osd_req_format(stat_request, false);
osdc = &rbd_dev->rbd_client->client->osdc;
ret = rbd_obj_request_submit(osdc, stat_request);
out:
if (ret)
rbd_obj_request_put(obj_request);
return ret;
}
static int rbd_img_request_submit(struct rbd_img_request *img_request) static int rbd_img_request_submit(struct rbd_img_request *img_request)
{ {
struct rbd_device *rbd_dev = img_request->rbd_dev; struct rbd_device *rbd_dev = img_request->rbd_dev;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request; struct rbd_obj_request *obj_request;
struct rbd_obj_request *next_obj_request; struct rbd_obj_request *next_obj_request;
bool write_request = img_request_write_test(img_request);
bool layered = img_request_layered_test(img_request);
dout("%s: img %p\n", __func__, img_request); dout("%s: img %p\n", __func__, img_request);
for_each_obj_request_safe(img_request, obj_request, next_obj_request) { for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bool known;
bool object_exists;
int ret; int ret;
ret = rbd_obj_request_submit(osdc, obj_request); /*
* We need to know whether the target object exists
* for a layered write. Issue an existence check
* first if we need to.
*/
known = obj_request_known_test(obj_request);
object_exists = known && obj_request_exists_test(obj_request);
if (!write_request || !layered || object_exists)
ret = rbd_obj_request_submit(osdc, obj_request);
else
ret = rbd_img_obj_exists_submit(obj_request);
if (ret) if (ret)
return ret; return ret;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment