Commit fc2744aa authored by Yan, Zheng's avatar Yan, Zheng Committed by Sage Weil

ceph: fix race between page writeback and truncate

The client can receive truncate request from MDS at any time.
So the page writeback code need to get i_size, truncate_seq and
truncate_size atomically
Signed-off-by: default avatarYan, Zheng <zheng.z.yan@intel.com>
Reviewed-by: default avatarSage Weil <sage@inktank.com>
parent 3803da49
...@@ -438,13 +438,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -438,13 +438,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
struct ceph_inode_info *ci; struct ceph_inode_info *ci;
struct ceph_fs_client *fsc; struct ceph_fs_client *fsc;
struct ceph_osd_client *osdc; struct ceph_osd_client *osdc;
loff_t page_off = page_offset(page);
int len = PAGE_CACHE_SIZE;
loff_t i_size;
int err = 0;
struct ceph_snap_context *snapc, *oldest; struct ceph_snap_context *snapc, *oldest;
u64 snap_size = 0; loff_t page_off = page_offset(page);
long writeback_stat; long writeback_stat;
u64 truncate_size, snap_size = 0;
u32 truncate_seq;
int err = 0, len = PAGE_CACHE_SIZE;
dout("writepage %p idx %lu\n", page, page->index); dout("writepage %p idx %lu\n", page, page->index);
...@@ -474,13 +473,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -474,13 +473,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
} }
ceph_put_snap_context(oldest); ceph_put_snap_context(oldest);
spin_lock(&ci->i_ceph_lock);
truncate_seq = ci->i_truncate_seq;
truncate_size = ci->i_truncate_size;
if (!snap_size)
snap_size = i_size_read(inode);
spin_unlock(&ci->i_ceph_lock);
/* is this a partial page at end of file? */ /* is this a partial page at end of file? */
if (snap_size) if (page_off >= snap_size) {
i_size = snap_size; dout("%p page eof %llu\n", page, snap_size);
else goto out;
i_size = i_size_read(inode); }
if (i_size < page_off + len) if (snap_size < page_off + len)
len = i_size - page_off; len = snap_size - page_off;
dout("writepage %p page %p index %lu on %llu~%u snapc %p\n", dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
inode, page, page->index, page_off, len, snapc); inode, page, page->index, page_off, len, snapc);
...@@ -494,7 +500,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -494,7 +500,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
err = ceph_osdc_writepages(osdc, ceph_vino(inode), err = ceph_osdc_writepages(osdc, ceph_vino(inode),
&ci->i_layout, snapc, &ci->i_layout, snapc,
page_off, len, page_off, len,
ci->i_truncate_seq, ci->i_truncate_size, truncate_seq, truncate_size,
&inode->i_mtime, &page, 1); &inode->i_mtime, &page, 1);
if (err < 0) { if (err < 0) {
dout("writepage setting page/mapping error %d %p\n", err, page); dout("writepage setting page/mapping error %d %p\n", err, page);
...@@ -631,25 +637,6 @@ static void writepages_finish(struct ceph_osd_request *req, ...@@ -631,25 +637,6 @@ static void writepages_finish(struct ceph_osd_request *req,
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
} }
static struct ceph_osd_request *
ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len,
struct ceph_snap_context *snapc, int num_ops)
{
struct ceph_fs_client *fsc;
struct ceph_inode_info *ci;
struct ceph_vino vino;
fsc = ceph_inode_to_client(inode);
ci = ceph_inode(inode);
vino = ceph_vino(inode);
/* BUG_ON(vino.snap != CEPH_NOSNAP); */
return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, offset, len, num_ops, CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK,
snapc, ci->i_truncate_seq, ci->i_truncate_size, true);
}
/* /*
* initiate async writeback * initiate async writeback
*/ */
...@@ -658,7 +645,8 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -658,7 +645,8 @@ static int ceph_writepages_start(struct address_space *mapping,
{ {
struct inode *inode = mapping->host; struct inode *inode = mapping->host;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc; struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_vino vino = ceph_vino(inode);
pgoff_t index, start, end; pgoff_t index, start, end;
int range_whole = 0; int range_whole = 0;
int should_loop = 1; int should_loop = 1;
...@@ -670,7 +658,8 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -670,7 +658,8 @@ static int ceph_writepages_start(struct address_space *mapping,
unsigned wsize = 1 << inode->i_blkbits; unsigned wsize = 1 << inode->i_blkbits;
struct ceph_osd_request *req = NULL; struct ceph_osd_request *req = NULL;
int do_sync; int do_sync;
u64 snap_size; u64 truncate_size, snap_size;
u32 truncate_seq;
/* /*
* Include a 'sync' in the OSD request if this is a data * Include a 'sync' in the OSD request if this is a data
...@@ -685,7 +674,6 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -685,7 +674,6 @@ static int ceph_writepages_start(struct address_space *mapping,
wbc->sync_mode == WB_SYNC_NONE ? "NONE" : wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
(wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
fsc = ceph_inode_to_client(inode);
if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) { if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
pr_warning("writepage_start %p on forced umount\n", inode); pr_warning("writepage_start %p on forced umount\n", inode);
return -EIO; /* we're in a forced umount, don't write! */ return -EIO; /* we're in a forced umount, don't write! */
...@@ -728,6 +716,14 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -728,6 +716,14 @@ static int ceph_writepages_start(struct address_space *mapping,
snap_size = i_size_read(inode); snap_size = i_size_read(inode);
dout(" oldest snapc is %p seq %lld (%d snaps)\n", dout(" oldest snapc is %p seq %lld (%d snaps)\n",
snapc, snapc->seq, snapc->num_snaps); snapc, snapc->seq, snapc->num_snaps);
spin_lock(&ci->i_ceph_lock);
truncate_seq = ci->i_truncate_seq;
truncate_size = ci->i_truncate_size;
if (!snap_size)
snap_size = i_size_read(inode);
spin_unlock(&ci->i_ceph_lock);
if (last_snapc && snapc != last_snapc) { if (last_snapc && snapc != last_snapc) {
/* if we switched to a newer snapc, restart our scan at the /* if we switched to a newer snapc, restart our scan at the
* start of the original file range. */ * start of the original file range. */
...@@ -739,7 +735,6 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -739,7 +735,6 @@ static int ceph_writepages_start(struct address_space *mapping,
while (!done && index <= end) { while (!done && index <= end) {
int num_ops = do_sync ? 2 : 1; int num_ops = do_sync ? 2 : 1;
struct ceph_vino vino;
unsigned i; unsigned i;
int first; int first;
pgoff_t next; pgoff_t next;
...@@ -833,17 +828,18 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -833,17 +828,18 @@ static int ceph_writepages_start(struct address_space *mapping,
* that it will use. * that it will use.
*/ */
if (locked_pages == 0) { if (locked_pages == 0) {
size_t size;
BUG_ON(pages); BUG_ON(pages);
/* prepare async write request */ /* prepare async write request */
offset = (u64)page_offset(page); offset = (u64)page_offset(page);
len = wsize; len = wsize;
req = ceph_writepages_osd_request(inode, req = ceph_osdc_new_request(&fsc->client->osdc,
offset, &len, snapc, &ci->i_layout, vino,
num_ops); offset, &len, num_ops,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
snapc, truncate_seq,
truncate_size, true);
if (IS_ERR(req)) { if (IS_ERR(req)) {
rc = PTR_ERR(req); rc = PTR_ERR(req);
unlock_page(page); unlock_page(page);
...@@ -854,8 +850,8 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -854,8 +850,8 @@ static int ceph_writepages_start(struct address_space *mapping,
req->r_inode = inode; req->r_inode = inode;
max_pages = calc_pages_for(0, (u64)len); max_pages = calc_pages_for(0, (u64)len);
size = max_pages * sizeof (*pages); pages = kmalloc(max_pages * sizeof (*pages),
pages = kmalloc(size, GFP_NOFS); GFP_NOFS);
if (!pages) { if (!pages) {
pool = fsc->wb_pagevec_pool; pool = fsc->wb_pagevec_pool;
pages = mempool_alloc(pool, GFP_NOFS); pages = mempool_alloc(pool, GFP_NOFS);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment