Commit 7971bd92 authored by Sage Weil's avatar Sage Weil

ceph: revert commit 22cddde1

commit 22cddde1 breaks the atomicity of write operation, it also
introduces a deadlock between write and truncate.
Signed-off-by: default avatarYan, Zheng <zheng.z.yan@intel.com>
Reviewed-by: default avatarGreg Farnum <greg@inktank.com>

Conflicts:
	fs/ceph/addr.c
parent a8673d61
...@@ -1067,51 +1067,23 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, ...@@ -1067,51 +1067,23 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
struct page **pagep, void **fsdata) struct page **pagep, void **fsdata)
{ {
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = file->private_data;
struct page *page; struct page *page;
pgoff_t index = pos >> PAGE_CACHE_SHIFT; pgoff_t index = pos >> PAGE_CACHE_SHIFT;
int r, want, got = 0; int r;
if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_BUFFER;
dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
inode, ceph_vinop(inode), pos, len, inode->i_size);
r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len);
if (r < 0)
return r;
dout("write_begin %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
ceph_put_cap_refs(ci, got);
return -EAGAIN;
}
do { do {
/* get a page */ /* get a page */
page = grab_cache_page_write_begin(mapping, index, 0); page = grab_cache_page_write_begin(mapping, index, 0);
if (!page) { if (!page)
r = -ENOMEM; return -ENOMEM;
break; *pagep = page;
}
dout("write_begin file %p inode %p page %p %d~%d\n", file, dout("write_begin file %p inode %p page %p %d~%d\n", file,
inode, page, (int)pos, (int)len); inode, page, (int)pos, (int)len);
r = ceph_update_writeable_page(file, pos, len, page); r = ceph_update_writeable_page(file, pos, len, page);
if (r)
page_cache_release(page);
} while (r == -EAGAIN); } while (r == -EAGAIN);
if (r) {
ceph_put_cap_refs(ci, got);
} else {
*pagep = page;
*(int *)fsdata = got;
}
return r; return r;
} }
...@@ -1125,12 +1097,10 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, ...@@ -1125,12 +1097,10 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
struct page *page, void *fsdata) struct page *page, void *fsdata)
{ {
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_client *mdsc = fsc->mdsc;
unsigned from = pos & (PAGE_CACHE_SIZE - 1); unsigned from = pos & (PAGE_CACHE_SIZE - 1);
int check_cap = 0; int check_cap = 0;
int got = (unsigned long)fsdata;
dout("write_end file %p inode %p page %p %d~%d (%d)\n", file, dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
inode, page, (int)pos, (int)copied, (int)len); inode, page, (int)pos, (int)copied, (int)len);
...@@ -1153,19 +1123,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, ...@@ -1153,19 +1123,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
up_read(&mdsc->snap_rwsem); up_read(&mdsc->snap_rwsem);
page_cache_release(page); page_cache_release(page);
if (copied > 0) {
int dirty;
spin_lock(&ci->i_ceph_lock);
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
__mark_inode_dirty(inode, dirty);
}
dout("write_end %p %llx.%llx %llu~%u dropping cap refs on %s\n",
inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
ceph_put_cap_refs(ci, got);
if (check_cap) if (check_cap)
ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
......
...@@ -718,53 +718,63 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, ...@@ -718,53 +718,63 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
struct ceph_osd_client *osdc = struct ceph_osd_client *osdc =
&ceph_sb_to_client(inode->i_sb)->client->osdc; &ceph_sb_to_client(inode->i_sb)->client->osdc;
loff_t endoff = pos + iov->iov_len; loff_t endoff = pos + iov->iov_len;
int got = 0; int want, got = 0;
int ret, err, written; int ret, err;
if (ceph_snap(inode) != CEPH_NOSNAP) if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS; return -EROFS;
retry_snap: retry_snap:
written = 0;
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
return -ENOSPC; return -ENOSPC;
__ceph_do_pending_vmtruncate(inode); __ceph_do_pending_vmtruncate(inode);
dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
inode->i_size);
if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_BUFFER;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
if (ret < 0)
goto out_put;
/* dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
* try to do a buffered write. if we don't have sufficient inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
* caps, we'll get -EAGAIN from generic_file_aio_write, or a ceph_cap_string(got));
* short write if we only get caps for some pages.
*/ if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
if (!(iocb->ki_filp->f_flags & O_DIRECT) && (iocb->ki_filp->f_flags & O_DIRECT) ||
!(inode->i_sb->s_flags & MS_SYNCHRONOUS) && (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
!(fi->flags & CEPH_F_SYNC)) { (fi->flags & CEPH_F_SYNC)) {
ret = generic_file_aio_write(iocb, iov, nr_segs, pos); ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
if (ret >= 0) &iocb->ki_pos);
written = ret; } else {
/*
* buffered write; drop Fw early to avoid slow
* revocation if we get stuck on balance_dirty_pages
*/
int dirty;
spin_lock(&ci->i_ceph_lock);
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
ceph_put_cap_refs(ci, got);
ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
if ((ret >= 0 || ret == -EIOCBQUEUED) && if ((ret >= 0 || ret == -EIOCBQUEUED) &&
((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
|| ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
err = vfs_fsync_range(file, pos, pos + written - 1, 1); err = vfs_fsync_range(file, pos, pos + ret - 1, 1);
if (err < 0) if (err < 0)
ret = err; ret = err;
} }
if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff)
goto out;
}
dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n", if (dirty)
inode, ceph_vinop(inode), pos + written, __mark_inode_dirty(inode, dirty);
(unsigned)iov->iov_len - written, inode->i_size);
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff);
if (ret < 0)
goto out; goto out;
}
dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), pos + written,
(unsigned)iov->iov_len - written, ceph_cap_string(got));
ret = ceph_sync_write(file, iov->iov_base + written,
iov->iov_len - written, &iocb->ki_pos);
if (ret >= 0) { if (ret >= 0) {
int dirty; int dirty;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
...@@ -773,10 +783,13 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, ...@@ -773,10 +783,13 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
if (dirty) if (dirty)
__mark_inode_dirty(inode, dirty); __mark_inode_dirty(inode, dirty);
} }
out_put:
dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n",
inode, ceph_vinop(inode), pos + written, inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
(unsigned)iov->iov_len - written, ceph_cap_string(got)); ceph_cap_string(got));
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
out: out:
if (ret == -EOLDSNAPC) { if (ret == -EOLDSNAPC) {
dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
......
...@@ -1916,6 +1916,7 @@ static void __wake_requests(struct ceph_mds_client *mdsc, ...@@ -1916,6 +1916,7 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
req = list_entry(tmp_list.next, req = list_entry(tmp_list.next,
struct ceph_mds_request, r_wait); struct ceph_mds_request, r_wait);
list_del_init(&req->r_wait); list_del_init(&req->r_wait);
dout(" wake request %p tid %llu\n", req, req->r_tid);
__do_request(mdsc, req); __do_request(mdsc, req);
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment