Commit b7495fc2 authored by Sage Weil's avatar Sage Weil

ceph: make page alignment explicit in osd interface

We used to infer alignment of IOs within a page based on the file offset,
which assumed they matched.  This broke with direct IO that was not aligned
to pages (e.g., 512-byte aligned IO).  We were also trusting the alignment
specified in the OSD reply, which could have been adjusted by the server.

Explicitly specify the page alignment when setting up OSD IO requests.
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent e98b6fed
...@@ -204,7 +204,7 @@ static int readpage_nounlock(struct file *filp, struct page *page) ...@@ -204,7 +204,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
page->index << PAGE_CACHE_SHIFT, &len, page->index << PAGE_CACHE_SHIFT, &len,
ci->i_truncate_seq, ci->i_truncate_size, ci->i_truncate_seq, ci->i_truncate_size,
&page, 1); &page, 1, 0);
if (err == -ENOENT) if (err == -ENOENT)
err = 0; err = 0;
if (err < 0) { if (err < 0) {
...@@ -287,7 +287,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, ...@@ -287,7 +287,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
offset, &len, offset, &len,
ci->i_truncate_seq, ci->i_truncate_size, ci->i_truncate_seq, ci->i_truncate_size,
pages, nr_pages); pages, nr_pages, 0);
if (rc == -ENOENT) if (rc == -ENOENT)
rc = 0; rc = 0;
if (rc < 0) if (rc < 0)
...@@ -782,7 +782,7 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -782,7 +782,7 @@ static int ceph_writepages_start(struct address_space *mapping,
snapc, do_sync, snapc, do_sync,
ci->i_truncate_seq, ci->i_truncate_seq,
ci->i_truncate_size, ci->i_truncate_size,
&inode->i_mtime, true, 1); &inode->i_mtime, true, 1, 0);
max_pages = req->r_num_pages; max_pages = req->r_num_pages;
alloc_page_vec(fsc, req); alloc_page_vec(fsc, req);
......
...@@ -282,11 +282,12 @@ int ceph_release(struct inode *inode, struct file *file) ...@@ -282,11 +282,12 @@ int ceph_release(struct inode *inode, struct file *file)
static int striped_read(struct inode *inode, static int striped_read(struct inode *inode,
u64 off, u64 len, u64 off, u64 len,
struct page **pages, int num_pages, struct page **pages, int num_pages,
int *checkeof) int *checkeof, bool align_to_pages)
{ {
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
u64 pos, this_len; u64 pos, this_len;
int io_align, page_align;
int page_off = off & ~PAGE_CACHE_MASK; /* first byte's offset in page */ int page_off = off & ~PAGE_CACHE_MASK; /* first byte's offset in page */
int left, pages_left; int left, pages_left;
int read; int read;
...@@ -302,14 +303,19 @@ static int striped_read(struct inode *inode, ...@@ -302,14 +303,19 @@ static int striped_read(struct inode *inode,
page_pos = pages; page_pos = pages;
pages_left = num_pages; pages_left = num_pages;
read = 0; read = 0;
io_align = off & ~PAGE_MASK;
more: more:
if (align_to_pages)
page_align = (pos - io_align) & ~PAGE_MASK;
else
page_align = pos & ~PAGE_MASK;
this_len = left; this_len = left;
ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
&ci->i_layout, pos, &this_len, &ci->i_layout, pos, &this_len,
ci->i_truncate_seq, ci->i_truncate_seq,
ci->i_truncate_size, ci->i_truncate_size,
page_pos, pages_left); page_pos, pages_left, page_align);
hit_stripe = this_len < left; hit_stripe = this_len < left;
was_short = ret >= 0 && ret < this_len; was_short = ret >= 0 && ret < this_len;
if (ret == -ENOENT) if (ret == -ENOENT)
...@@ -393,7 +399,8 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data, ...@@ -393,7 +399,8 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data,
if (ret < 0) if (ret < 0)
goto done; goto done;
ret = striped_read(inode, off, len, pages, num_pages, checkeof); ret = striped_read(inode, off, len, pages, num_pages, checkeof,
file->f_flags & O_DIRECT);
if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) if (ret >= 0 && (file->f_flags & O_DIRECT) == 0)
ret = ceph_copy_page_vector_to_user(pages, data, off, ret); ret = ceph_copy_page_vector_to_user(pages, data, off, ret);
...@@ -448,6 +455,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, ...@@ -448,6 +455,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
int flags; int flags;
int do_sync = 0; int do_sync = 0;
int check_caps = 0; int check_caps = 0;
int page_align, io_align;
int ret; int ret;
struct timespec mtime = CURRENT_TIME; struct timespec mtime = CURRENT_TIME;
...@@ -462,6 +470,8 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, ...@@ -462,6 +470,8 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
else else
pos = *offset; pos = *offset;
io_align = pos & ~PAGE_MASK;
ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
if (ret < 0) if (ret < 0)
return ret; return ret;
...@@ -486,20 +496,26 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, ...@@ -486,20 +496,26 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
*/ */
more: more:
len = left; len = left;
if (file->f_flags & O_DIRECT)
/* write from beginning of first page, regardless of
io alignment */
page_align = (pos - io_align) & ~PAGE_MASK;
else
page_align = pos & ~PAGE_MASK;
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), pos, &len, ceph_vino(inode), pos, &len,
CEPH_OSD_OP_WRITE, flags, CEPH_OSD_OP_WRITE, flags,
ci->i_snap_realm->cached_context, ci->i_snap_realm->cached_context,
do_sync, do_sync,
ci->i_truncate_seq, ci->i_truncate_size, ci->i_truncate_seq, ci->i_truncate_size,
&mtime, false, 2); &mtime, false, 2, page_align);
if (!req) if (!req)
return -ENOMEM; return -ENOMEM;
num_pages = calc_pages_for(pos, len); num_pages = calc_pages_for(pos, len);
if (file->f_flags & O_DIRECT) { if (file->f_flags & O_DIRECT) {
pages = ceph_get_direct_page_vector(data, num_pages, pos, len); pages = ceph_get_direct_page_vector(data, num_pages);
if (IS_ERR(pages)) { if (IS_ERR(pages)) {
ret = PTR_ERR(pages); ret = PTR_ERR(pages);
goto out; goto out;
......
...@@ -1752,7 +1752,7 @@ int ceph_do_getattr(struct inode *inode, int mask) ...@@ -1752,7 +1752,7 @@ int ceph_do_getattr(struct inode *inode, int mask)
return 0; return 0;
} }
dout("do_getattr inode %p mask %s\n", inode, ceph_cap_string(mask)); dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
return 0; return 0;
......
...@@ -79,6 +79,7 @@ struct ceph_osd_request { ...@@ -79,6 +79,7 @@ struct ceph_osd_request {
struct ceph_file_layout r_file_layout; struct ceph_file_layout r_file_layout;
struct ceph_snap_context *r_snapc; /* snap context for writes */ struct ceph_snap_context *r_snapc; /* snap context for writes */
unsigned r_num_pages; /* size of page array (follows) */ unsigned r_num_pages; /* size of page array (follows) */
unsigned r_page_alignment; /* io offset in first page */
struct page **r_pages; /* pages for data payload */ struct page **r_pages; /* pages for data payload */
int r_pages_from_pool; int r_pages_from_pool;
int r_own_pages; /* if true, i own page list */ int r_own_pages; /* if true, i own page list */
...@@ -194,7 +195,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, ...@@ -194,7 +195,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
int do_sync, u32 truncate_seq, int do_sync, u32 truncate_seq,
u64 truncate_size, u64 truncate_size,
struct timespec *mtime, struct timespec *mtime,
bool use_mempool, int num_reply); bool use_mempool, int num_reply,
int page_align);
static inline void ceph_osdc_get_request(struct ceph_osd_request *req) static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
{ {
...@@ -218,7 +220,8 @@ extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, ...@@ -218,7 +220,8 @@ extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout, struct ceph_file_layout *layout,
u64 off, u64 *plen, u64 off, u64 *plen,
u32 truncate_seq, u64 truncate_size, u32 truncate_seq, u64 truncate_size,
struct page **pages, int nr_pages); struct page **pages, int nr_pages,
int page_align);
extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct ceph_vino vino, struct ceph_vino vino,
......
...@@ -71,6 +71,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc, ...@@ -71,6 +71,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
op->extent.length = objlen; op->extent.length = objlen;
} }
req->r_num_pages = calc_pages_for(off, *plen); req->r_num_pages = calc_pages_for(off, *plen);
req->r_page_alignment = off & ~PAGE_MASK;
if (op->op == CEPH_OSD_OP_WRITE) if (op->op == CEPH_OSD_OP_WRITE)
op->payload_len = *plen; op->payload_len = *plen;
...@@ -419,7 +420,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, ...@@ -419,7 +420,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
u32 truncate_seq, u32 truncate_seq,
u64 truncate_size, u64 truncate_size,
struct timespec *mtime, struct timespec *mtime,
bool use_mempool, int num_reply) bool use_mempool, int num_reply,
int page_align)
{ {
struct ceph_osd_req_op ops[3]; struct ceph_osd_req_op ops[3];
struct ceph_osd_request *req; struct ceph_osd_request *req;
...@@ -447,6 +449,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, ...@@ -447,6 +449,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
calc_layout(osdc, vino, layout, off, plen, req, ops); calc_layout(osdc, vino, layout, off, plen, req, ops);
req->r_file_layout = *layout; /* keep a copy */ req->r_file_layout = *layout; /* keep a copy */
/* in case it differs from natural alignment that calc_layout
filled in for us */
req->r_page_alignment = page_align;
ceph_osdc_build_request(req, off, plen, ops, ceph_osdc_build_request(req, off, plen, ops,
snapc, snapc,
mtime, mtime,
...@@ -1489,7 +1495,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, ...@@ -1489,7 +1495,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_vino vino, struct ceph_file_layout *layout, struct ceph_vino vino, struct ceph_file_layout *layout,
u64 off, u64 *plen, u64 off, u64 *plen,
u32 truncate_seq, u64 truncate_size, u32 truncate_seq, u64 truncate_size,
struct page **pages, int num_pages) struct page **pages, int num_pages, int page_align)
{ {
struct ceph_osd_request *req; struct ceph_osd_request *req;
int rc = 0; int rc = 0;
...@@ -1499,15 +1505,15 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, ...@@ -1499,15 +1505,15 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
req = ceph_osdc_new_request(osdc, layout, vino, off, plen, req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, 0, truncate_seq, truncate_size, NULL, NULL, 0, truncate_seq, truncate_size, NULL,
false, 1); false, 1, page_align);
if (!req) if (!req)
return -ENOMEM; return -ENOMEM;
/* it may be a short read due to an object boundary */ /* it may be a short read due to an object boundary */
req->r_pages = pages; req->r_pages = pages;
dout("readpages final extent is %llu~%llu (%d pages)\n", dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
off, *plen, req->r_num_pages); off, *plen, req->r_num_pages, page_align);
rc = ceph_osdc_start_request(osdc, req, false); rc = ceph_osdc_start_request(osdc, req, false);
if (!rc) if (!rc)
...@@ -1533,6 +1539,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, ...@@ -1533,6 +1539,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
{ {
struct ceph_osd_request *req; struct ceph_osd_request *req;
int rc = 0; int rc = 0;
int page_align = off & ~PAGE_MASK;
BUG_ON(vino.snap != CEPH_NOSNAP); BUG_ON(vino.snap != CEPH_NOSNAP);
req = ceph_osdc_new_request(osdc, layout, vino, off, &len, req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
...@@ -1541,7 +1548,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, ...@@ -1541,7 +1548,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
CEPH_OSD_FLAG_WRITE, CEPH_OSD_FLAG_WRITE,
snapc, do_sync, snapc, do_sync,
truncate_seq, truncate_size, mtime, truncate_seq, truncate_size, mtime,
nofail, 1); nofail, 1, page_align);
if (!req) if (!req)
return -ENOMEM; return -ENOMEM;
...@@ -1638,8 +1645,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -1638,8 +1645,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
m = ceph_msg_get(req->r_reply); m = ceph_msg_get(req->r_reply);
if (data_len > 0) { if (data_len > 0) {
unsigned data_off = le16_to_cpu(hdr->data_off); int want = calc_pages_for(req->r_page_alignment, data_len);
int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
if (unlikely(req->r_num_pages < want)) { if (unlikely(req->r_num_pages < want)) {
pr_warning("tid %lld reply %d > expected %d pages\n", pr_warning("tid %lld reply %d > expected %d pages\n",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment