Commit 2ac2b7a6 authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

libceph: distinguish page and bio requests

An osd request uses either pages or a bio list for its data.  Use a
union to record information about the two, and add a data type
tag to select between them.
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent 2794a82a
...@@ -1425,12 +1425,16 @@ static struct ceph_osd_request *rbd_osd_req_create( ...@@ -1425,12 +1425,16 @@ static struct ceph_osd_request *rbd_osd_req_create(
break; /* Nothing to do */ break; /* Nothing to do */
case OBJ_REQUEST_BIO: case OBJ_REQUEST_BIO:
rbd_assert(obj_request->bio_list != NULL); rbd_assert(obj_request->bio_list != NULL);
osd_req->r_data.type = CEPH_OSD_DATA_TYPE_BIO;
osd_req->r_data.bio = obj_request->bio_list; osd_req->r_data.bio = obj_request->bio_list;
break; break;
case OBJ_REQUEST_PAGES: case OBJ_REQUEST_PAGES:
osd_req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
osd_req->r_data.pages = obj_request->pages; osd_req->r_data.pages = obj_request->pages;
osd_req->r_data.num_pages = obj_request->page_count; osd_req->r_data.num_pages = obj_request->page_count;
osd_req->r_data.alignment = offset & ~PAGE_MASK; osd_req->r_data.alignment = offset & ~PAGE_MASK;
osd_req->r_data.pages_from_pool = false;
osd_req->r_data.own_pages = false;
break; break;
} }
......
...@@ -243,6 +243,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) ...@@ -243,6 +243,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
/* unlock all pages, zeroing any data we didn't read */ /* unlock all pages, zeroing any data we didn't read */
BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES);
for (i = 0; i < req->r_data.num_pages; i++, bytes -= PAGE_CACHE_SIZE) { for (i = 0; i < req->r_data.num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
struct page *page = req->r_data.pages[i]; struct page *page = req->r_data.pages[i];
...@@ -336,6 +337,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) ...@@ -336,6 +337,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
} }
pages[i] = page; pages[i] = page;
} }
req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
req->r_data.pages = pages; req->r_data.pages = pages;
req->r_data.num_pages = nr_pages; req->r_data.num_pages = nr_pages;
req->r_data.alignment = 0; req->r_data.alignment = 0;
...@@ -561,6 +563,7 @@ static void writepages_finish(struct ceph_osd_request *req, ...@@ -561,6 +563,7 @@ static void writepages_finish(struct ceph_osd_request *req,
long writeback_stat; long writeback_stat;
unsigned issued = ceph_caps_issued(ci); unsigned issued = ceph_caps_issued(ci);
BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES);
if (rc >= 0) { if (rc >= 0) {
/* /*
* Assume we wrote the pages we originally sent. The * Assume we wrote the pages we originally sent. The
...@@ -830,6 +833,7 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -830,6 +833,7 @@ static int ceph_writepages_start(struct address_space *mapping,
break; break;
} }
req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
req->r_data.num_pages = calc_pages_for(0, len); req->r_data.num_pages = calc_pages_for(0, len);
req->r_data.alignment = 0; req->r_data.alignment = 0;
max_pages = req->r_data.num_pages; max_pages = req->r_data.num_pages;
......
...@@ -571,6 +571,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, ...@@ -571,6 +571,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
req->r_data.own_pages = 1; req->r_data.own_pages = 1;
} }
} }
req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
req->r_data.pages = pages; req->r_data.pages = pages;
req->r_data.num_pages = num_pages; req->r_data.num_pages = num_pages;
req->r_data.alignment = page_align; req->r_data.alignment = page_align;
......
...@@ -50,8 +50,17 @@ struct ceph_osd { ...@@ -50,8 +50,17 @@ struct ceph_osd {
#define CEPH_OSD_MAX_OP 10 #define CEPH_OSD_MAX_OP 10
enum ceph_osd_data_type {
CEPH_OSD_DATA_TYPE_NONE,
CEPH_OSD_DATA_TYPE_PAGES,
#ifdef CONFIG_BLOCK
CEPH_OSD_DATA_TYPE_BIO,
#endif /* CONFIG_BLOCK */
};
struct ceph_osd_data { struct ceph_osd_data {
struct { enum ceph_osd_data_type type;
union {
struct { struct {
struct page **pages; struct page **pages;
u32 num_pages; u32 num_pages;
......
...@@ -122,7 +122,8 @@ void ceph_osdc_release_request(struct kref *kref) ...@@ -122,7 +122,8 @@ void ceph_osdc_release_request(struct kref *kref)
} }
if (req->r_reply) if (req->r_reply)
ceph_msg_put(req->r_reply); ceph_msg_put(req->r_reply);
if (req->r_data.own_pages) if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES &&
req->r_data.own_pages)
ceph_release_page_vector(req->r_data.pages, ceph_release_page_vector(req->r_data.pages,
req->r_data.num_pages); req->r_data.num_pages);
ceph_put_snap_context(req->r_snapc); ceph_put_snap_context(req->r_snapc);
...@@ -188,6 +189,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -188,6 +189,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
} }
req->r_reply = msg; req->r_reply = msg;
req->r_data.type = CEPH_OSD_DATA_TYPE_NONE;
ceph_pagelist_init(&req->r_trail); ceph_pagelist_init(&req->r_trail);
/* create request message; allow space for oid */ /* create request message; allow space for oid */
...@@ -1739,12 +1741,17 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, ...@@ -1739,12 +1741,17 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
{ {
int rc = 0; int rc = 0;
req->r_request->pages = req->r_data.pages; if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) {
req->r_request->page_count = req->r_data.num_pages; req->r_request->pages = req->r_data.pages;
req->r_request->page_alignment = req->r_data.alignment; req->r_request->page_count = req->r_data.num_pages;
req->r_request->page_alignment = req->r_data.alignment;
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
req->r_request->bio = req->r_data.bio; } else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) {
req->r_request->bio = req->r_data.bio;
#endif #endif
} else {
pr_err("unknown request data type %d\n", req->r_data.type);
}
req->r_request->trail = &req->r_trail; req->r_request->trail = &req->r_trail;
register_request(osdc, req); register_request(osdc, req);
...@@ -1944,6 +1951,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, ...@@ -1944,6 +1951,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
return PTR_ERR(req); return PTR_ERR(req);
/* it may be a short read due to an object boundary */ /* it may be a short read due to an object boundary */
req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
req->r_data.pages = pages; req->r_data.pages = pages;
req->r_data.num_pages = calc_pages_for(page_align, *plen); req->r_data.num_pages = calc_pages_for(page_align, *plen);
req->r_data.alignment = page_align; req->r_data.alignment = page_align;
...@@ -1987,6 +1995,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, ...@@ -1987,6 +1995,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
return PTR_ERR(req); return PTR_ERR(req);
/* it may be a short write due to an object boundary */ /* it may be a short write due to an object boundary */
req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
req->r_data.pages = pages; req->r_data.pages = pages;
req->r_data.num_pages = calc_pages_for(page_align, len); req->r_data.num_pages = calc_pages_for(page_align, len);
req->r_data.alignment = page_align; req->r_data.alignment = page_align;
...@@ -2083,23 +2092,30 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -2083,23 +2092,30 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
m = ceph_msg_get(req->r_reply); m = ceph_msg_get(req->r_reply);
if (data_len > 0) { if (data_len > 0) {
int want = calc_pages_for(req->r_data.alignment, data_len); if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) {
int want;
if (req->r_data.pages && unlikely(req->r_data.num_pages < want)) {
pr_warning("tid %lld reply has %d bytes %d pages, we" want = calc_pages_for(req->r_data.alignment, data_len);
" had only %d pages ready\n", tid, data_len, if (req->r_data.pages &&
want, req->r_data.num_pages); unlikely(req->r_data.num_pages < want)) {
*skip = 1;
ceph_msg_put(m); pr_warning("tid %lld reply has %d bytes %d "
m = NULL; "pages, we had only %d pages ready\n",
goto out; tid, data_len, want,
} req->r_data.num_pages);
m->pages = req->r_data.pages; *skip = 1;
m->page_count = req->r_data.num_pages; ceph_msg_put(m);
m->page_alignment = req->r_data.alignment; m = NULL;
goto out;
}
m->pages = req->r_data.pages;
m->page_count = req->r_data.num_pages;
m->page_alignment = req->r_data.alignment;
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
m->bio = req->r_data.bio; } else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) {
m->bio = req->r_data.bio;
#endif #endif
}
} }
*skip = 0; *skip = 0;
req->r_con_filling_msg = con->ops->get(con); req->r_con_filling_msg = con->ops->get(con);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment