Commit 85c7000f authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.18-rc1' of https://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "The highlights are:

   - several changes to how snap context and snap realms are tracked
     (Xiubo Li). In particular, this should resolve a long-standing
     issue of high kworker CPU usage and various stalls caused by
     needless iteration over all inodes in the snap realm.

   - async create fixes to address hangs in some edge cases (Jeff
     Layton)

   - support for getvxattr MDS op for querying server-side xattrs, such
     as file/directory layouts and ephemeral pins (Milind Changire)

   - average latency is now maintained for all metrics (Venky Shankar)

   - some tweaks around handling inline data to make it fit better with
     netfs helper library (David Howells)

  Also a couple of memory leaks got plugged along with a few assorted
  fixups. Last but not least, Xiubo has stepped up to serve as a CephFS
  co-maintainer"

* tag 'ceph-for-5.18-rc1' of https://github.com/ceph/ceph-client: (27 commits)
  ceph: fix memory leak in ceph_readdir when note_last_dentry returns error
  ceph: uninitialized variable in debug output
  ceph: use tracked average r/w/m latencies to display metrics in debugfs
  ceph: include average/stdev r/w/m latency in mds metrics
  ceph: track average r/w/m latency
  ceph: use ktime_to_timespec64() rather than jiffies_to_timespec64()
  ceph: assign the ci only when the inode isn't NULL
  ceph: fix inode reference leakage in ceph_get_snapdir()
  ceph: misc fix for code style and logs
  ceph: allocate capsnap memory outside of ceph_queue_cap_snap()
  ceph: do not release the global snaprealm until unmounting
  ceph: remove incorrect and unused CEPH_INO_DOTDOT macro
  MAINTAINERS: add Xiubo Li as cephfs co-maintainer
  ceph: eliminate the recursion when rebuilding the snap context
  ceph: do not update snapshot context when there is no new snapshot
  ceph: zero the dir_entries memory when allocating it
  ceph: move to a dedicated slabcache for ceph_cap_snap
  ceph: add getvxattr op
  libceph: drop else branches in prepare_read_data{,_cont}
  ceph: fix comments mentioning i_mutex
  ...
parents b1b07ba3 f639d986
...@@ -4456,6 +4456,7 @@ F: drivers/power/supply/cw2015_battery.c ...@@ -4456,6 +4456,7 @@ F: drivers/power/supply/cw2015_battery.c
CEPH COMMON CODE (LIBCEPH) CEPH COMMON CODE (LIBCEPH)
M: Ilya Dryomov <idryomov@gmail.com> M: Ilya Dryomov <idryomov@gmail.com>
M: Jeff Layton <jlayton@kernel.org> M: Jeff Layton <jlayton@kernel.org>
M: Xiubo Li <xiubli@redhat.com>
L: ceph-devel@vger.kernel.org L: ceph-devel@vger.kernel.org
S: Supported S: Supported
W: http://ceph.com/ W: http://ceph.com/
...@@ -4466,6 +4467,7 @@ F: net/ceph/ ...@@ -4466,6 +4467,7 @@ F: net/ceph/
CEPH DISTRIBUTED FILE SYSTEM CLIENT (CEPH) CEPH DISTRIBUTED FILE SYSTEM CLIENT (CEPH)
M: Jeff Layton <jlayton@kernel.org> M: Jeff Layton <jlayton@kernel.org>
M: Xiubo Li <xiubli@redhat.com>
M: Ilya Dryomov <idryomov@gmail.com> M: Ilya Dryomov <idryomov@gmail.com>
L: ceph-devel@vger.kernel.org L: ceph-devel@vger.kernel.org
S: Supported S: Supported
......
...@@ -184,7 +184,7 @@ static int ceph_releasepage(struct page *page, gfp_t gfp) ...@@ -184,7 +184,7 @@ static int ceph_releasepage(struct page *page, gfp_t gfp)
static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq) static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq)
{ {
struct inode *inode = rreq->mapping->host; struct inode *inode = rreq->inode;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_layout *lo = &ci->i_layout; struct ceph_file_layout *lo = &ci->i_layout;
u32 blockoff; u32 blockoff;
...@@ -201,7 +201,7 @@ static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq) ...@@ -201,7 +201,7 @@ static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq)
static bool ceph_netfs_clamp_length(struct netfs_read_subrequest *subreq) static bool ceph_netfs_clamp_length(struct netfs_read_subrequest *subreq)
{ {
struct inode *inode = subreq->rreq->mapping->host; struct inode *inode = subreq->rreq->inode;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
u64 objno, objoff; u64 objno, objoff;
...@@ -244,10 +244,63 @@ static void finish_netfs_read(struct ceph_osd_request *req) ...@@ -244,10 +244,63 @@ static void finish_netfs_read(struct ceph_osd_request *req)
iput(req->r_inode); iput(req->r_inode);
} }
static bool ceph_netfs_issue_op_inline(struct netfs_read_subrequest *subreq)
{
struct netfs_read_request *rreq = subreq->rreq;
struct inode *inode = rreq->inode;
struct ceph_mds_reply_info_parsed *rinfo;
struct ceph_mds_reply_info_in *iinfo;
struct ceph_mds_request *req;
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
struct ceph_inode_info *ci = ceph_inode(inode);
struct iov_iter iter;
ssize_t err = 0;
size_t len;
__set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
__clear_bit(NETFS_SREQ_WRITE_TO_CACHE, &subreq->flags);
if (subreq->start >= inode->i_size)
goto out;
/* We need to fetch the inline data. */
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}
req->r_ino1 = ci->i_vino;
req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA);
req->r_num_caps = 2;
err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err < 0)
goto out;
rinfo = &req->r_reply_info;
iinfo = &rinfo->targeti;
if (iinfo->inline_version == CEPH_INLINE_NONE) {
/* The data got uninlined */
ceph_mdsc_put_request(req);
return false;
}
len = min_t(size_t, iinfo->inline_len - subreq->start, subreq->len);
iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
err = copy_to_iter(iinfo->inline_data + subreq->start, len, &iter);
if (err == 0)
err = -EFAULT;
ceph_mdsc_put_request(req);
out:
netfs_subreq_terminated(subreq, err, false);
return true;
}
static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq) static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
{ {
struct netfs_read_request *rreq = subreq->rreq; struct netfs_read_request *rreq = subreq->rreq;
struct inode *inode = rreq->mapping->host; struct inode *inode = rreq->inode;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_request *req; struct ceph_osd_request *req;
...@@ -258,6 +311,10 @@ static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq) ...@@ -258,6 +311,10 @@ static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
int err = 0; int err = 0;
u64 len = subreq->len; u64 len = subreq->len;
if (ci->i_inline_version != CEPH_INLINE_NONE &&
ceph_netfs_issue_op_inline(subreq))
return;
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len, req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
0, 1, CEPH_OSD_OP_READ, 0, 1, CEPH_OSD_OP_READ,
CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica, CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
...@@ -326,23 +383,9 @@ static int ceph_readpage(struct file *file, struct page *subpage) ...@@ -326,23 +383,9 @@ static int ceph_readpage(struct file *file, struct page *subpage)
size_t len = folio_size(folio); size_t len = folio_size(folio);
u64 off = folio_file_pos(folio); u64 off = folio_file_pos(folio);
if (ci->i_inline_version != CEPH_INLINE_NONE) { dout("readpage ino %llx.%llx file %p off %llu len %zu folio %p index %lu\n inline %d",
/* vino.ino, vino.snap, file, off, len, folio, folio_index(folio),
* Uptodate inline data should have been added ci->i_inline_version != CEPH_INLINE_NONE);
* into page cache while getting Fcr caps.
*/
if (off == 0) {
folio_unlock(folio);
return -EINVAL;
}
zero_user_segment(&folio->page, 0, folio_size(folio));
folio_mark_uptodate(folio);
folio_unlock(folio);
return 0;
}
dout("readpage ino %llx.%llx file %p off %llu len %zu folio %p index %lu\n",
vino.ino, vino.snap, file, off, len, folio, folio_index(folio));
return netfs_readpage(file, folio, &ceph_netfs_read_ops, NULL); return netfs_readpage(file, folio, &ceph_netfs_read_ops, NULL);
} }
...@@ -1281,45 +1324,11 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, ...@@ -1281,45 +1324,11 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
struct page **pagep, void **fsdata) struct page **pagep, void **fsdata)
{ {
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct folio *folio = NULL; struct folio *folio = NULL;
pgoff_t index = pos >> PAGE_SHIFT;
int r; int r;
/*
* Uninlining should have already been done and everything updated, EXCEPT
* for inline_version sent to the MDS.
*/
if (ci->i_inline_version != CEPH_INLINE_NONE) {
unsigned int fgp_flags = FGP_LOCK | FGP_WRITE | FGP_CREAT | FGP_STABLE;
if (aop_flags & AOP_FLAG_NOFS)
fgp_flags |= FGP_NOFS;
folio = __filemap_get_folio(mapping, index, fgp_flags,
mapping_gfp_mask(mapping));
if (!folio)
return -ENOMEM;
/*
* The inline_version on a new inode is set to 1. If that's the
* case, then the folio is brand new and isn't yet Uptodate.
*/
r = 0;
if (index == 0 && ci->i_inline_version != 1) {
if (!folio_test_uptodate(folio)) {
WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n",
ci->i_inline_version);
r = -EINVAL;
}
goto out;
}
zero_user_segment(&folio->page, 0, folio_size(folio));
folio_mark_uptodate(folio);
goto out;
}
r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &folio, NULL, r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &folio, NULL,
&ceph_netfs_read_ops, NULL); &ceph_netfs_read_ops, NULL);
out:
if (r == 0) if (r == 0)
folio_wait_fscache(folio); folio_wait_fscache(folio);
if (r < 0) { if (r < 0) {
...@@ -1515,19 +1524,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) ...@@ -1515,19 +1524,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
sb_start_pagefault(inode->i_sb); sb_start_pagefault(inode->i_sb);
ceph_block_sigs(&oldset); ceph_block_sigs(&oldset);
if (ci->i_inline_version != CEPH_INLINE_NONE) {
struct page *locked_page = NULL;
if (off == 0) {
lock_page(page);
locked_page = page;
}
err = ceph_uninline_data(vma->vm_file, locked_page);
if (locked_page)
unlock_page(locked_page);
if (err < 0)
goto out_free;
}
if (off + thp_size(page) <= size) if (off + thp_size(page) <= size)
len = thp_size(page); len = thp_size(page);
else else
...@@ -1584,11 +1580,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) ...@@ -1584,11 +1580,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
ceph_put_snap_context(snapc); ceph_put_snap_context(snapc);
} while (err == 0); } while (err == 0);
if (ret == VM_FAULT_LOCKED || if (ret == VM_FAULT_LOCKED) {
ci->i_inline_version != CEPH_INLINE_NONE) {
int dirty; int dirty;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&prealloc_cf); &prealloc_cf);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
...@@ -1652,16 +1646,30 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, ...@@ -1652,16 +1646,30 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
} }
} }
int ceph_uninline_data(struct file *filp, struct page *locked_page) int ceph_uninline_data(struct file *file)
{ {
struct inode *inode = file_inode(filp); struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_request *req; struct ceph_osd_request *req;
struct page *page = NULL; struct ceph_cap_flush *prealloc_cf;
u64 len, inline_version; struct folio *folio = NULL;
u64 inline_version = CEPH_INLINE_NONE;
struct page *pages[1];
int err = 0; int err = 0;
bool from_pagecache = false; u64 len;
prealloc_cf = ceph_alloc_cap_flush();
if (!prealloc_cf)
return -ENOMEM;
folio = read_mapping_folio(inode->i_mapping, 0, file);
if (IS_ERR(folio)) {
err = PTR_ERR(folio);
goto out;
}
folio_lock(folio);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
inline_version = ci->i_inline_version; inline_version = ci->i_inline_version;
...@@ -1672,45 +1680,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ...@@ -1672,45 +1680,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
if (inline_version == 1 || /* initial version, no data */ if (inline_version == 1 || /* initial version, no data */
inline_version == CEPH_INLINE_NONE) inline_version == CEPH_INLINE_NONE)
goto out; goto out_unlock;
if (locked_page) {
page = locked_page;
WARN_ON(!PageUptodate(page));
} else if (ceph_caps_issued(ci) &
(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
page = find_get_page(inode->i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
from_pagecache = true;
lock_page(page);
} else {
put_page(page);
page = NULL;
}
}
}
if (page) { len = i_size_read(inode);
len = i_size_read(inode); if (len > folio_size(folio))
if (len > PAGE_SIZE) len = folio_size(folio);
len = PAGE_SIZE;
} else {
page = __page_cache_alloc(GFP_NOFS);
if (!page) {
err = -ENOMEM;
goto out;
}
err = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, true);
if (err < 0) {
/* no inline data */
if (err == -ENODATA)
err = 0;
goto out;
}
len = err;
}
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), 0, &len, 0, 1, ceph_vino(inode), 0, &len, 0, 1,
...@@ -1718,7 +1692,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ...@@ -1718,7 +1692,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
NULL, 0, 0, false); NULL, 0, 0, false);
if (IS_ERR(req)) { if (IS_ERR(req)) {
err = PTR_ERR(req); err = PTR_ERR(req);
goto out; goto out_unlock;
} }
req->r_mtime = inode->i_mtime; req->r_mtime = inode->i_mtime;
...@@ -1727,7 +1701,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ...@@ -1727,7 +1701,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
err = ceph_osdc_wait_request(&fsc->client->osdc, req); err = ceph_osdc_wait_request(&fsc->client->osdc, req);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (err < 0) if (err < 0)
goto out; goto out_unlock;
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), 0, &len, 1, 3, ceph_vino(inode), 0, &len, 1, 3,
...@@ -1736,10 +1710,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ...@@ -1736,10 +1710,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ci->i_truncate_size, false); ci->i_truncate_size, false);
if (IS_ERR(req)) { if (IS_ERR(req)) {
err = PTR_ERR(req); err = PTR_ERR(req);
goto out; goto out_unlock;
} }
osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false); pages[0] = folio_page(folio, 0);
osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false);
{ {
__le64 xattr_buf = cpu_to_le64(inline_version); __le64 xattr_buf = cpu_to_le64(inline_version);
...@@ -1749,7 +1724,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ...@@ -1749,7 +1724,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
CEPH_OSD_CMPXATTR_OP_GT, CEPH_OSD_CMPXATTR_OP_GT,
CEPH_OSD_CMPXATTR_MODE_U64); CEPH_OSD_CMPXATTR_MODE_U64);
if (err) if (err)
goto out_put; goto out_put_req;
} }
{ {
...@@ -1760,7 +1735,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ...@@ -1760,7 +1735,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
"inline_version", "inline_version",
xattr_buf, xattr_len, 0, 0); xattr_buf, xattr_len, 0, 0);
if (err) if (err)
goto out_put; goto out_put_req;
} }
req->r_mtime = inode->i_mtime; req->r_mtime = inode->i_mtime;
...@@ -1771,19 +1746,28 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ...@@ -1771,19 +1746,28 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, len, err); req->r_end_latency, len, err);
out_put: if (!err) {
int dirty;
/* Set to CAP_INLINE_NONE and dirty the caps */
down_read(&fsc->mdsc->snap_rwsem);
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf);
spin_unlock(&ci->i_ceph_lock);
up_read(&fsc->mdsc->snap_rwsem);
if (dirty)
__mark_inode_dirty(inode, dirty);
}
out_put_req:
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (err == -ECANCELED) if (err == -ECANCELED)
err = 0; err = 0;
out_unlock:
folio_unlock(folio);
folio_put(folio);
out: out:
if (page && page != locked_page) { ceph_free_cap_flush(prealloc_cf);
if (from_pagecache) {
unlock_page(page);
put_page(page);
} else
__free_pages(page, 0);
}
dout("uninline_data %p %llx.%llx inline_version %llu = %d\n", dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
inode, ceph_vinop(inode), inline_version, err); inode, ceph_vinop(inode), inline_version, err);
return err; return err;
......
...@@ -1915,6 +1915,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1915,6 +1915,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
ceph_get_mds_session(session); ceph_get_mds_session(session);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
/* Don't send messages until we get async create reply */
spin_unlock(&ci->i_ceph_lock);
ceph_put_mds_session(session);
return;
}
if (ci->i_ceph_flags & CEPH_I_FLUSH) if (ci->i_ceph_flags & CEPH_I_FLUSH)
flags |= CHECK_CAPS_FLUSH; flags |= CHECK_CAPS_FLUSH;
retry: retry:
...@@ -2409,6 +2416,9 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) ...@@ -2409,6 +2416,9 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
dout("write_inode %p wait=%d\n", inode, wait); dout("write_inode %p wait=%d\n", inode, wait);
ceph_fscache_unpin_writeback(inode, wbc); ceph_fscache_unpin_writeback(inode, wbc);
if (wait) { if (wait) {
err = ceph_wait_on_async_create(inode);
if (err)
return err;
dirty = try_flush_caps(inode, &flush_tid); dirty = try_flush_caps(inode, &flush_tid);
if (dirty) if (dirty)
err = wait_event_interruptible(ci->i_cap_wq, err = wait_event_interruptible(ci->i_cap_wq,
...@@ -2439,6 +2449,10 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc, ...@@ -2439,6 +2449,10 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
u64 first_tid = 0; u64 first_tid = 0;
u64 last_snap_flush = 0; u64 last_snap_flush = 0;
/* Don't do anything until create reply comes in */
if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE)
return;
ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) { list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) {
...@@ -4152,7 +4166,6 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -4152,7 +4166,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
/* lookup ino */ /* lookup ino */
inode = ceph_find_inode(mdsc->fsc->sb, vino); inode = ceph_find_inode(mdsc->fsc->sb, vino);
ci = ceph_inode(inode);
dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
vino.snap, inode); vino.snap, inode);
...@@ -4178,6 +4191,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -4178,6 +4191,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
} }
goto flush_cap_releases; goto flush_cap_releases;
} }
ci = ceph_inode(inode);
/* these will work even if we don't have a cap yet */ /* these will work even if we don't have a cap yet */
switch (op) { switch (op) {
......
...@@ -175,7 +175,7 @@ static int metrics_latency_show(struct seq_file *s, void *p) ...@@ -175,7 +175,7 @@ static int metrics_latency_show(struct seq_file *s, void *p)
struct ceph_fs_client *fsc = s->private; struct ceph_fs_client *fsc = s->private;
struct ceph_client_metric *cm = &fsc->mdsc->metric; struct ceph_client_metric *cm = &fsc->mdsc->metric;
struct ceph_metric *m; struct ceph_metric *m;
s64 total, sum, avg, min, max, sq; s64 total, avg, min, max, sq;
int i; int i;
seq_printf(s, "item total avg_lat(us) min_lat(us) max_lat(us) stdev(us)\n"); seq_printf(s, "item total avg_lat(us) min_lat(us) max_lat(us) stdev(us)\n");
...@@ -185,8 +185,7 @@ static int metrics_latency_show(struct seq_file *s, void *p) ...@@ -185,8 +185,7 @@ static int metrics_latency_show(struct seq_file *s, void *p)
m = &cm->metric[i]; m = &cm->metric[i];
spin_lock(&m->lock); spin_lock(&m->lock);
total = m->total; total = m->total;
sum = m->latency_sum; avg = m->latency_avg;
avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
min = m->latency_min; min = m->latency_min;
max = m->latency_max; max = m->latency_max;
sq = m->latency_sq_sum; sq = m->latency_sq_sum;
......
...@@ -145,7 +145,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, ...@@ -145,7 +145,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx,
return ERR_PTR(-EAGAIN); return ERR_PTR(-EAGAIN);
} }
/* reading/filling the cache are serialized by /* reading/filling the cache are serialized by
i_mutex, no need to use page lock */ i_rwsem, no need to use page lock */
unlock_page(cache_ctl->page); unlock_page(cache_ctl->page);
cache_ctl->dentries = kmap(cache_ctl->page); cache_ctl->dentries = kmap(cache_ctl->page);
} }
...@@ -155,7 +155,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, ...@@ -155,7 +155,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx,
rcu_read_lock(); rcu_read_lock();
spin_lock(&parent->d_lock); spin_lock(&parent->d_lock);
/* check i_size again here, because empty directory can be /* check i_size again here, because empty directory can be
* marked as complete while not holding the i_mutex. */ * marked as complete while not holding the i_rwsem. */
if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir)) if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
dentry = cache_ctl->dentries[cache_ctl->index]; dentry = cache_ctl->dentries[cache_ctl->index];
else else
...@@ -478,8 +478,11 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -478,8 +478,11 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
2 : (fpos_off(rde->offset) + 1); 2 : (fpos_off(rde->offset) + 1);
err = note_last_dentry(dfi, rde->name, rde->name_len, err = note_last_dentry(dfi, rde->name, rde->name_len,
next_offset); next_offset);
if (err) if (err) {
ceph_mdsc_put_request(dfi->last_readdir);
dfi->last_readdir = NULL;
return err; return err;
}
} else if (req->r_reply_info.dir_end) { } else if (req->r_reply_info.dir_end) {
dfi->next_offset = 2; dfi->next_offset = 2;
/* keep last name */ /* keep last name */
...@@ -520,6 +523,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -520,6 +523,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
if (!dir_emit(ctx, rde->name, rde->name_len, if (!dir_emit(ctx, rde->name, rde->name_len,
ceph_present_ino(inode->i_sb, le64_to_cpu(rde->inode.in->ino)), ceph_present_ino(inode->i_sb, le64_to_cpu(rde->inode.in->ino)),
le32_to_cpu(rde->inode.in->mode) >> 12)) { le32_to_cpu(rde->inode.in->mode) >> 12)) {
/*
* NOTE: Here no need to put the 'dfi->last_readdir',
* because when dir_emit stops us it's most likely
* doesn't have enough memory, etc. So for next readdir
* it will continue.
*/
dout("filldir stopping us...\n"); dout("filldir stopping us...\n");
return 0; return 0;
} }
...@@ -671,7 +680,7 @@ struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req, ...@@ -671,7 +680,7 @@ struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req,
struct dentry *dentry) struct dentry *dentry)
{ {
struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */ struct inode *parent = d_inode(dentry->d_parent); /* we hold i_rwsem */
/* .snap dir? */ /* .snap dir? */
if (ceph_snap(parent) == CEPH_NOSNAP && if (ceph_snap(parent) == CEPH_NOSNAP &&
......
...@@ -207,6 +207,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file, ...@@ -207,6 +207,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
struct ceph_mount_options *opt = struct ceph_mount_options *opt =
ceph_inode_to_client(&ci->vfs_inode)->mount_options; ceph_inode_to_client(&ci->vfs_inode)->mount_options;
struct ceph_file_info *fi; struct ceph_file_info *fi;
int ret;
dout("%s %p %p 0%o (%s)\n", __func__, inode, file, dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
inode->i_mode, isdir ? "dir" : "regular"); inode->i_mode, isdir ? "dir" : "regular");
...@@ -240,7 +241,22 @@ static int ceph_init_file_info(struct inode *inode, struct file *file, ...@@ -240,7 +241,22 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
INIT_LIST_HEAD(&fi->rw_contexts); INIT_LIST_HEAD(&fi->rw_contexts);
fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen); fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
if ((file->f_mode & FMODE_WRITE) &&
ci->i_inline_version != CEPH_INLINE_NONE) {
ret = ceph_uninline_data(file);
if (ret < 0)
goto error;
}
return 0; return 0;
error:
ceph_fscache_unuse_cookie(inode, file->f_mode & FMODE_WRITE);
ceph_put_fmode(ci, fi->fmode, 1);
kmem_cache_free(ceph_file_cachep, fi);
/* wake up anyone waiting for caps on this inode */
wake_up_all(&ci->i_cap_wq);
return ret;
} }
/* /*
...@@ -516,52 +532,67 @@ static void restore_deleg_ino(struct inode *dir, u64 ino) ...@@ -516,52 +532,67 @@ static void restore_deleg_ino(struct inode *dir, u64 ino)
} }
} }
static void wake_async_create_waiters(struct inode *inode,
struct ceph_mds_session *session)
{
struct ceph_inode_info *ci = ceph_inode(inode);
spin_lock(&ci->i_ceph_lock);
if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE;
wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT);
}
ceph_kick_flushing_inode_caps(session, ci);
spin_unlock(&ci->i_ceph_lock);
}
static void ceph_async_create_cb(struct ceph_mds_client *mdsc, static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req) struct ceph_mds_request *req)
{ {
struct dentry *dentry = req->r_dentry;
struct inode *dinode = d_inode(dentry);
struct inode *tinode = req->r_target_inode;
int result = req->r_err ? req->r_err : int result = req->r_err ? req->r_err :
le32_to_cpu(req->r_reply_info.head->result); le32_to_cpu(req->r_reply_info.head->result);
WARN_ON_ONCE(dinode && tinode && dinode != tinode);
/* MDS changed -- caller must resubmit */
if (result == -EJUKEBOX) if (result == -EJUKEBOX)
goto out; goto out;
mapping_set_error(req->r_parent->i_mapping, result); mapping_set_error(req->r_parent->i_mapping, result);
if (result) { if (result) {
struct dentry *dentry = req->r_dentry;
struct inode *inode = d_inode(dentry);
int pathlen = 0; int pathlen = 0;
u64 base = 0; u64 base = 0;
char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen, char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
&base, 0); &base, 0);
pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
base, IS_ERR(path) ? "<<bad>>" : path, result);
ceph_mdsc_free_path(path, pathlen);
ceph_dir_clear_complete(req->r_parent); ceph_dir_clear_complete(req->r_parent);
if (!d_unhashed(dentry)) if (!d_unhashed(dentry))
d_drop(dentry); d_drop(dentry);
ceph_inode_shutdown(inode); if (dinode) {
mapping_set_error(dinode->i_mapping, result);
pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n", ceph_inode_shutdown(dinode);
base, IS_ERR(path) ? "<<bad>>" : path, result); wake_async_create_waiters(dinode, req->r_session);
ceph_mdsc_free_path(path, pathlen); }
} }
if (req->r_target_inode) { if (tinode) {
struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); u64 ino = ceph_vino(tinode).ino;
u64 ino = ceph_vino(req->r_target_inode).ino;
if (req->r_deleg_ino != ino) if (req->r_deleg_ino != ino)
pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n", pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n",
__func__, req->r_err, req->r_deleg_ino, ino); __func__, req->r_err, req->r_deleg_ino, ino);
mapping_set_error(req->r_target_inode->i_mapping, result);
spin_lock(&ci->i_ceph_lock); mapping_set_error(tinode->i_mapping, result);
if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) { wake_async_create_waiters(tinode, req->r_session);
ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE;
wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT);
}
ceph_kick_flushing_inode_caps(req->r_session, ci);
spin_unlock(&ci->i_ceph_lock);
} else if (!result) { } else if (!result) {
pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__, pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__,
req->r_deleg_ino); req->r_deleg_ino);
...@@ -1041,7 +1072,6 @@ static void ceph_aio_complete(struct inode *inode, ...@@ -1041,7 +1072,6 @@ static void ceph_aio_complete(struct inode *inode,
} }
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&aio_req->prealloc_cf); &aio_req->prealloc_cf);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
...@@ -1778,12 +1808,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -1778,12 +1808,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if (err) if (err)
goto out; goto out;
if (ci->i_inline_version != CEPH_INLINE_NONE) {
err = ceph_uninline_data(file, NULL);
if (err < 0)
goto out;
}
dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
inode, ceph_vinop(inode), pos, count, i_size_read(inode)); inode, ceph_vinop(inode), pos, count, i_size_read(inode));
if (!(fi->flags & CEPH_F_SYNC) && !direct_lock) if (!(fi->flags & CEPH_F_SYNC) && !direct_lock)
...@@ -1855,7 +1879,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -1855,7 +1879,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
int dirty; int dirty;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&prealloc_cf); &prealloc_cf);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
...@@ -2109,12 +2132,6 @@ static long ceph_fallocate(struct file *file, int mode, ...@@ -2109,12 +2132,6 @@ static long ceph_fallocate(struct file *file, int mode,
goto unlock; goto unlock;
} }
if (ci->i_inline_version != CEPH_INLINE_NONE) {
ret = ceph_uninline_data(file, NULL);
if (ret < 0)
goto unlock;
}
size = i_size_read(inode); size = i_size_read(inode);
/* Are we punching a hole beyond EOF? */ /* Are we punching a hole beyond EOF? */
...@@ -2139,7 +2156,6 @@ static long ceph_fallocate(struct file *file, int mode, ...@@ -2139,7 +2156,6 @@ static long ceph_fallocate(struct file *file, int mode,
if (!ret) { if (!ret) {
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&prealloc_cf); &prealloc_cf);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
...@@ -2532,7 +2548,6 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off, ...@@ -2532,7 +2548,6 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
} }
/* Mark Fw dirty */ /* Mark Fw dirty */
spin_lock(&dst_ci->i_ceph_lock); spin_lock(&dst_ci->i_ceph_lock);
dst_ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
spin_unlock(&dst_ci->i_ceph_lock); spin_unlock(&dst_ci->i_ceph_lock);
if (dirty) if (dirty)
......
...@@ -87,13 +87,13 @@ struct inode *ceph_get_snapdir(struct inode *parent) ...@@ -87,13 +87,13 @@ struct inode *ceph_get_snapdir(struct inode *parent)
if (!S_ISDIR(parent->i_mode)) { if (!S_ISDIR(parent->i_mode)) {
pr_warn_once("bad snapdir parent type (mode=0%o)\n", pr_warn_once("bad snapdir parent type (mode=0%o)\n",
parent->i_mode); parent->i_mode);
return ERR_PTR(-ENOTDIR); goto err;
} }
if (!(inode->i_state & I_NEW) && !S_ISDIR(inode->i_mode)) { if (!(inode->i_state & I_NEW) && !S_ISDIR(inode->i_mode)) {
pr_warn_once("bad snapdir inode type (mode=0%o)\n", pr_warn_once("bad snapdir inode type (mode=0%o)\n",
inode->i_mode); inode->i_mode);
return ERR_PTR(-ENOTDIR); goto err;
} }
inode->i_mode = parent->i_mode; inode->i_mode = parent->i_mode;
...@@ -113,6 +113,12 @@ struct inode *ceph_get_snapdir(struct inode *parent) ...@@ -113,6 +113,12 @@ struct inode *ceph_get_snapdir(struct inode *parent)
} }
return inode; return inode;
err:
if ((inode->i_state & I_NEW))
discard_new_inode(inode);
else
iput(inode);
return ERR_PTR(-ENOTDIR);
} }
const struct inode_operations ceph_file_iops = { const struct inode_operations ceph_file_iops = {
...@@ -1201,7 +1207,7 @@ static void update_dentry_lease_careful(struct dentry *dentry, ...@@ -1201,7 +1207,7 @@ static void update_dentry_lease_careful(struct dentry *dentry,
/* /*
* splice a dentry to an inode. * splice a dentry to an inode.
* caller must hold directory i_mutex for this to be safe. * caller must hold directory i_rwsem for this to be safe.
*/ */
static int splice_dentry(struct dentry **pdn, struct inode *in) static int splice_dentry(struct dentry **pdn, struct inode *in)
{ {
...@@ -1598,7 +1604,7 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn, ...@@ -1598,7 +1604,7 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
return idx == 0 ? -ENOMEM : 0; return idx == 0 ? -ENOMEM : 0;
} }
/* reading/filling the cache are serialized by /* reading/filling the cache are serialized by
* i_mutex, no need to use page lock */ * i_rwsem, no need to use page lock */
unlock_page(ctl->page); unlock_page(ctl->page);
ctl->dentries = kmap(ctl->page); ctl->dentries = kmap(ctl->page);
if (idx == 0) if (idx == 0)
...@@ -2301,6 +2307,57 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page, ...@@ -2301,6 +2307,57 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
return err; return err;
} }
int ceph_do_getvxattr(struct inode *inode, const char *name, void *value,
size_t size)
{
struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_mds_request *req;
int mode = USE_AUTH_MDS;
int err;
char *xattr_value;
size_t xattr_value_len;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETVXATTR, mode);
if (IS_ERR(req)) {
err = -ENOMEM;
goto out;
}
req->r_path2 = kstrdup(name, GFP_NOFS);
if (!req->r_path2) {
err = -ENOMEM;
goto put;
}
ihold(inode);
req->r_inode = inode;
err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err < 0)
goto put;
xattr_value = req->r_reply_info.xattr_info.xattr_value;
xattr_value_len = req->r_reply_info.xattr_info.xattr_value_len;
dout("do_getvxattr xattr_value_len:%zu, size:%zu\n", xattr_value_len, size);
err = (int)xattr_value_len;
if (size == 0)
goto put;
if (xattr_value_len > size) {
err = -ERANGE;
goto put;
}
memcpy(value, xattr_value, xattr_value_len);
put:
ceph_mdsc_put_request(req);
out:
dout("do_getvxattr result=%d\n", err);
return err;
}
/* /*
* Check inode permissions. We verify we have a valid value for * Check inode permissions. We verify we have a valid value for
......
...@@ -111,10 +111,10 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode, ...@@ -111,10 +111,10 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
req->r_args.filelock_change.length = cpu_to_le64(length); req->r_args.filelock_change.length = cpu_to_le64(length);
req->r_args.filelock_change.wait = wait; req->r_args.filelock_change.wait = wait;
if (wait) err = ceph_mdsc_submit_request(mdsc, inode, req);
req->r_wait_for_completion = ceph_lock_wait_for_completion; if (!err)
err = ceph_mdsc_wait_request(mdsc, req, wait ?
err = ceph_mdsc_do_request(mdsc, inode, req); ceph_lock_wait_for_completion : NULL);
if (!err && operation == CEPH_MDS_OP_GETFILELOCK) { if (!err && operation == CEPH_MDS_OP_GETFILELOCK) {
fl->fl_pid = -le64_to_cpu(req->r_reply_info.filelock_reply->pid); fl->fl_pid = -le64_to_cpu(req->r_reply_info.filelock_reply->pid);
if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type) if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type)
......
...@@ -555,6 +555,28 @@ static int parse_reply_info_create(void **p, void *end, ...@@ -555,6 +555,28 @@ static int parse_reply_info_create(void **p, void *end,
return -EIO; return -EIO;
} }
static int parse_reply_info_getvxattr(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
u64 features)
{
u32 value_len;
ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
ceph_decode_skip_32(p, end, bad); /* skip payload length */
ceph_decode_32_safe(p, end, value_len, bad);
if (value_len == end - *p) {
info->xattr_info.xattr_value = *p;
info->xattr_info.xattr_value_len = value_len;
*p = end;
return value_len;
}
bad:
return -EIO;
}
/* /*
* parse extra results * parse extra results
*/ */
...@@ -570,6 +592,8 @@ static int parse_reply_info_extra(void **p, void *end, ...@@ -570,6 +592,8 @@ static int parse_reply_info_extra(void **p, void *end,
return parse_reply_info_readdir(p, end, info, features); return parse_reply_info_readdir(p, end, info, features);
else if (op == CEPH_MDS_OP_CREATE) else if (op == CEPH_MDS_OP_CREATE)
return parse_reply_info_create(p, end, info, features, s); return parse_reply_info_create(p, end, info, features, s);
else if (op == CEPH_MDS_OP_GETVXATTR)
return parse_reply_info_getvxattr(p, end, info, features);
else else
return -EIO; return -EIO;
} }
...@@ -2178,7 +2202,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, ...@@ -2178,7 +2202,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
order = get_order(size * num_entries); order = get_order(size * num_entries);
while (order >= 0) { while (order >= 0) {
rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
__GFP_NOWARN, __GFP_NOWARN |
__GFP_ZERO,
order); order);
if (rinfo->dir_entries) if (rinfo->dir_entries)
break; break;
...@@ -2946,15 +2971,16 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, ...@@ -2946,15 +2971,16 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
return err; return err;
} }
static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req) struct ceph_mds_request *req,
ceph_mds_request_wait_callback_t wait_func)
{ {
int err; int err;
/* wait */ /* wait */
dout("do_request waiting\n"); dout("do_request waiting\n");
if (!req->r_timeout && req->r_wait_for_completion) { if (wait_func) {
err = req->r_wait_for_completion(mdsc, req); err = wait_func(mdsc, req);
} else { } else {
long timeleft = wait_for_completion_killable_timeout( long timeleft = wait_for_completion_killable_timeout(
&req->r_completion, &req->r_completion,
...@@ -3011,7 +3037,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ...@@ -3011,7 +3037,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
/* issue */ /* issue */
err = ceph_mdsc_submit_request(mdsc, dir, req); err = ceph_mdsc_submit_request(mdsc, dir, req);
if (!err) if (!err)
err = ceph_mdsc_wait_request(mdsc, req); err = ceph_mdsc_wait_request(mdsc, req, NULL);
dout("do_request %p done, result %d\n", req, err); dout("do_request %p done, result %d\n", req, err);
return err; return err;
} }
...@@ -3097,35 +3123,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -3097,35 +3123,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
result = le32_to_cpu(head->result); result = le32_to_cpu(head->result);
/*
* Handle an ESTALE
* if we're not talking to the authority, send to them
* if the authority has changed while we weren't looking,
* send to new authority
* Otherwise we just have to return an ESTALE
*/
if (result == -ESTALE) {
dout("got ESTALE on request %llu\n", req->r_tid);
req->r_resend_mds = -1;
if (req->r_direct_mode != USE_AUTH_MDS) {
dout("not using auth, setting for that now\n");
req->r_direct_mode = USE_AUTH_MDS;
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
} else {
int mds = __choose_mds(mdsc, req, NULL);
if (mds >= 0 && mds != req->r_session->s_mds) {
dout("but auth changed, so resending\n");
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
}
}
dout("have to return ESTALE on request %llu\n", req->r_tid);
}
if (head->safe) { if (head->safe) {
set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
__unregister_request(mdsc, req); __unregister_request(mdsc, req);
...@@ -4841,7 +4838,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) ...@@ -4841,7 +4838,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
ceph_cleanup_snapid_map(mdsc); ceph_cleanup_snapid_map(mdsc);
ceph_cleanup_empty_realms(mdsc); ceph_cleanup_global_and_empty_realms(mdsc);
cancel_work_sync(&mdsc->cap_reclaim_work); cancel_work_sync(&mdsc->cap_reclaim_work);
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
......
...@@ -100,6 +100,11 @@ struct ceph_mds_reply_dir_entry { ...@@ -100,6 +100,11 @@ struct ceph_mds_reply_dir_entry {
loff_t offset; loff_t offset;
}; };
struct ceph_mds_reply_xattr {
char *xattr_value;
size_t xattr_value_len;
};
/* /*
* parsed info about an mds reply, including information about * parsed info about an mds reply, including information about
* either: 1) the target inode and/or its parent directory and dentry, * either: 1) the target inode and/or its parent directory and dentry,
...@@ -115,6 +120,7 @@ struct ceph_mds_reply_info_parsed { ...@@ -115,6 +120,7 @@ struct ceph_mds_reply_info_parsed {
char *dname; char *dname;
u32 dname_len; u32 dname_len;
struct ceph_mds_reply_lease *dlease; struct ceph_mds_reply_lease *dlease;
struct ceph_mds_reply_xattr xattr_info;
/* extra */ /* extra */
union { union {
...@@ -274,8 +280,8 @@ struct ceph_mds_request { ...@@ -274,8 +280,8 @@ struct ceph_mds_request {
union ceph_mds_request_args r_args; union ceph_mds_request_args r_args;
int r_fmode; /* file mode, if expecting cap */ int r_fmode; /* file mode, if expecting cap */
const struct cred *r_cred;
int r_request_release_offset; int r_request_release_offset;
const struct cred *r_cred;
struct timespec64 r_stamp; struct timespec64 r_stamp;
/* for choosing which mds to send this request to */ /* for choosing which mds to send this request to */
...@@ -296,12 +302,11 @@ struct ceph_mds_request { ...@@ -296,12 +302,11 @@ struct ceph_mds_request {
struct ceph_msg *r_reply; struct ceph_msg *r_reply;
struct ceph_mds_reply_info_parsed r_reply_info; struct ceph_mds_reply_info_parsed r_reply_info;
int r_err; int r_err;
u32 r_readdir_offset;
struct page *r_locked_page; struct page *r_locked_page;
int r_dir_caps; int r_dir_caps;
int r_num_caps; int r_num_caps;
u32 r_readdir_offset;
unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */ unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */
unsigned long r_started; /* start time to measure timeout against */ unsigned long r_started; /* start time to measure timeout against */
...@@ -329,7 +334,6 @@ struct ceph_mds_request { ...@@ -329,7 +334,6 @@ struct ceph_mds_request {
struct completion r_completion; struct completion r_completion;
struct completion r_safe_completion; struct completion r_safe_completion;
ceph_mds_request_callback_t r_callback; ceph_mds_request_callback_t r_callback;
ceph_mds_request_wait_callback_t r_wait_for_completion;
struct list_head r_unsafe_item; /* per-session unsafe list item */ struct list_head r_unsafe_item; /* per-session unsafe list item */
long long r_dir_release_cnt; long long r_dir_release_cnt;
...@@ -507,6 +511,9 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode); ...@@ -507,6 +511,9 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode);
extern int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, extern int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
struct inode *dir, struct inode *dir,
struct ceph_mds_request *req); struct ceph_mds_request *req);
int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req,
ceph_mds_request_wait_callback_t wait_func);
extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
struct inode *dir, struct inode *dir,
struct ceph_mds_request *req); struct ceph_mds_request *req);
......
...@@ -8,6 +8,12 @@ ...@@ -8,6 +8,12 @@
#include "metric.h" #include "metric.h"
#include "mds_client.h" #include "mds_client.h"
static void ktime_to_ceph_timespec(struct ceph_timespec *ts, ktime_t val)
{
struct timespec64 t = ktime_to_timespec64(val);
ceph_encode_timespec64(ts, &t);
}
static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
struct ceph_mds_session *s) struct ceph_mds_session *s)
{ {
...@@ -26,7 +32,6 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -26,7 +32,6 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
u64 nr_caps = atomic64_read(&m->total_caps); u64 nr_caps = atomic64_read(&m->total_caps);
u32 header_len = sizeof(struct ceph_metric_header); u32 header_len = sizeof(struct ceph_metric_header);
struct ceph_msg *msg; struct ceph_msg *msg;
struct timespec64 ts;
s64 sum; s64 sum;
s32 items = 0; s32 items = 0;
s32 len; s32 len;
...@@ -59,37 +64,40 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -59,37 +64,40 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
/* encode the read latency metric */ /* encode the read latency metric */
read = (struct ceph_metric_read_latency *)(cap + 1); read = (struct ceph_metric_read_latency *)(cap + 1);
read->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY); read->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY);
read->header.ver = 1; read->header.ver = 2;
read->header.compat = 1; read->header.compat = 1;
read->header.data_len = cpu_to_le32(sizeof(*read) - header_len); read->header.data_len = cpu_to_le32(sizeof(*read) - header_len);
sum = m->metric[METRIC_READ].latency_sum; sum = m->metric[METRIC_READ].latency_sum;
jiffies_to_timespec64(sum, &ts); ktime_to_ceph_timespec(&read->lat, sum);
read->sec = cpu_to_le32(ts.tv_sec); ktime_to_ceph_timespec(&read->avg, m->metric[METRIC_READ].latency_avg);
read->nsec = cpu_to_le32(ts.tv_nsec); read->sq_sum = cpu_to_le64(m->metric[METRIC_READ].latency_sq_sum);
read->count = cpu_to_le64(m->metric[METRIC_READ].total);
items++; items++;
/* encode the write latency metric */ /* encode the write latency metric */
write = (struct ceph_metric_write_latency *)(read + 1); write = (struct ceph_metric_write_latency *)(read + 1);
write->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY); write->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY);
write->header.ver = 1; write->header.ver = 2;
write->header.compat = 1; write->header.compat = 1;
write->header.data_len = cpu_to_le32(sizeof(*write) - header_len); write->header.data_len = cpu_to_le32(sizeof(*write) - header_len);
sum = m->metric[METRIC_WRITE].latency_sum; sum = m->metric[METRIC_WRITE].latency_sum;
jiffies_to_timespec64(sum, &ts); ktime_to_ceph_timespec(&write->lat, sum);
write->sec = cpu_to_le32(ts.tv_sec); ktime_to_ceph_timespec(&write->avg, m->metric[METRIC_WRITE].latency_avg);
write->nsec = cpu_to_le32(ts.tv_nsec); write->sq_sum = cpu_to_le64(m->metric[METRIC_WRITE].latency_sq_sum);
write->count = cpu_to_le64(m->metric[METRIC_WRITE].total);
items++; items++;
/* encode the metadata latency metric */ /* encode the metadata latency metric */
meta = (struct ceph_metric_metadata_latency *)(write + 1); meta = (struct ceph_metric_metadata_latency *)(write + 1);
meta->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY); meta->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY);
meta->header.ver = 1; meta->header.ver = 2;
meta->header.compat = 1; meta->header.compat = 1;
meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len); meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len);
sum = m->metric[METRIC_METADATA].latency_sum; sum = m->metric[METRIC_METADATA].latency_sum;
jiffies_to_timespec64(sum, &ts); ktime_to_ceph_timespec(&meta->lat, sum);
meta->sec = cpu_to_le32(ts.tv_sec); ktime_to_ceph_timespec(&meta->avg, m->metric[METRIC_METADATA].latency_avg);
meta->nsec = cpu_to_le32(ts.tv_nsec); meta->sq_sum = cpu_to_le64(m->metric[METRIC_METADATA].latency_sq_sum);
meta->count = cpu_to_le64(m->metric[METRIC_METADATA].total);
items++; items++;
/* encode the dentry lease metric */ /* encode the dentry lease metric */
...@@ -250,6 +258,7 @@ int ceph_metric_init(struct ceph_client_metric *m) ...@@ -250,6 +258,7 @@ int ceph_metric_init(struct ceph_client_metric *m)
metric->size_max = 0; metric->size_max = 0;
metric->total = 0; metric->total = 0;
metric->latency_sum = 0; metric->latency_sum = 0;
metric->latency_avg = 0;
metric->latency_sq_sum = 0; metric->latency_sq_sum = 0;
metric->latency_min = KTIME_MAX; metric->latency_min = KTIME_MAX;
metric->latency_max = 0; metric->latency_max = 0;
...@@ -307,20 +316,19 @@ void ceph_metric_destroy(struct ceph_client_metric *m) ...@@ -307,20 +316,19 @@ void ceph_metric_destroy(struct ceph_client_metric *m)
max = new; \ max = new; \
} }
static inline void __update_stdev(ktime_t total, ktime_t lsum, static inline void __update_mean_and_stdev(ktime_t total, ktime_t *lavg,
ktime_t *sq_sump, ktime_t lat) ktime_t *sq_sump, ktime_t lat)
{ {
ktime_t avg, sq; ktime_t avg;
if (unlikely(total == 1)) if (unlikely(total == 1)) {
return; *lavg = lat;
} else {
/* the sq is (lat - old_avg) * (lat - new_avg) */ /* the sq is (lat - old_avg) * (lat - new_avg) */
avg = DIV64_U64_ROUND_CLOSEST((lsum - lat), (total - 1)); avg = *lavg + div64_s64(lat - *lavg, total);
sq = lat - avg; *sq_sump += (lat - *lavg)*(lat - avg);
avg = DIV64_U64_ROUND_CLOSEST(lsum, total); *lavg = avg;
sq = sq * (lat - avg); }
*sq_sump += sq;
} }
void ceph_update_metrics(struct ceph_metric *m, void ceph_update_metrics(struct ceph_metric *m,
...@@ -339,6 +347,7 @@ void ceph_update_metrics(struct ceph_metric *m, ...@@ -339,6 +347,7 @@ void ceph_update_metrics(struct ceph_metric *m,
METRIC_UPDATE_MIN_MAX(m->size_min, m->size_max, size); METRIC_UPDATE_MIN_MAX(m->size_min, m->size_max, size);
m->latency_sum += lat; m->latency_sum += lat;
METRIC_UPDATE_MIN_MAX(m->latency_min, m->latency_max, lat); METRIC_UPDATE_MIN_MAX(m->latency_min, m->latency_max, lat);
__update_stdev(total, m->latency_sum, &m->latency_sq_sum, lat); __update_mean_and_stdev(total, &m->latency_avg, &m->latency_sq_sum,
lat);
spin_unlock(&m->lock); spin_unlock(&m->lock);
} }
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#ifndef _FS_CEPH_MDS_METRIC_H #ifndef _FS_CEPH_MDS_METRIC_H
#define _FS_CEPH_MDS_METRIC_H #define _FS_CEPH_MDS_METRIC_H
#include <linux/types.h> #include <linux/ceph/types.h>
#include <linux/percpu_counter.h> #include <linux/percpu_counter.h>
#include <linux/ktime.h> #include <linux/ktime.h>
...@@ -19,27 +19,39 @@ enum ceph_metric_type { ...@@ -19,27 +19,39 @@ enum ceph_metric_type {
CLIENT_METRIC_TYPE_OPENED_INODES, CLIENT_METRIC_TYPE_OPENED_INODES,
CLIENT_METRIC_TYPE_READ_IO_SIZES, CLIENT_METRIC_TYPE_READ_IO_SIZES,
CLIENT_METRIC_TYPE_WRITE_IO_SIZES, CLIENT_METRIC_TYPE_WRITE_IO_SIZES,
CLIENT_METRIC_TYPE_AVG_READ_LATENCY,
CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_WRITE_IO_SIZES, CLIENT_METRIC_TYPE_STDEV_READ_LATENCY,
CLIENT_METRIC_TYPE_AVG_WRITE_LATENCY,
CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY,
CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY,
CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY,
CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY,
}; };
/* /*
* This will always have the highest metric bit value * This will always have the highest metric bit value
* as the last element of the array. * as the last element of the array.
*/ */
#define CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED { \ #define CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED { \
CLIENT_METRIC_TYPE_CAP_INFO, \ CLIENT_METRIC_TYPE_CAP_INFO, \
CLIENT_METRIC_TYPE_READ_LATENCY, \ CLIENT_METRIC_TYPE_READ_LATENCY, \
CLIENT_METRIC_TYPE_WRITE_LATENCY, \ CLIENT_METRIC_TYPE_WRITE_LATENCY, \
CLIENT_METRIC_TYPE_METADATA_LATENCY, \ CLIENT_METRIC_TYPE_METADATA_LATENCY, \
CLIENT_METRIC_TYPE_DENTRY_LEASE, \ CLIENT_METRIC_TYPE_DENTRY_LEASE, \
CLIENT_METRIC_TYPE_OPENED_FILES, \ CLIENT_METRIC_TYPE_OPENED_FILES, \
CLIENT_METRIC_TYPE_PINNED_ICAPS, \ CLIENT_METRIC_TYPE_PINNED_ICAPS, \
CLIENT_METRIC_TYPE_OPENED_INODES, \ CLIENT_METRIC_TYPE_OPENED_INODES, \
CLIENT_METRIC_TYPE_READ_IO_SIZES, \ CLIENT_METRIC_TYPE_READ_IO_SIZES, \
CLIENT_METRIC_TYPE_WRITE_IO_SIZES, \ CLIENT_METRIC_TYPE_WRITE_IO_SIZES, \
\ CLIENT_METRIC_TYPE_AVG_READ_LATENCY, \
CLIENT_METRIC_TYPE_MAX, \ CLIENT_METRIC_TYPE_STDEV_READ_LATENCY, \
CLIENT_METRIC_TYPE_AVG_WRITE_LATENCY, \
CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY, \
CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY, \
CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \
\
CLIENT_METRIC_TYPE_MAX, \
} }
struct ceph_metric_header { struct ceph_metric_header {
...@@ -60,22 +72,28 @@ struct ceph_metric_cap { ...@@ -60,22 +72,28 @@ struct ceph_metric_cap {
/* metric read latency header */ /* metric read latency header */
struct ceph_metric_read_latency { struct ceph_metric_read_latency {
struct ceph_metric_header header; struct ceph_metric_header header;
__le32 sec; struct ceph_timespec lat;
__le32 nsec; struct ceph_timespec avg;
__le64 sq_sum;
__le64 count;
} __packed; } __packed;
/* metric write latency header */ /* metric write latency header */
struct ceph_metric_write_latency { struct ceph_metric_write_latency {
struct ceph_metric_header header; struct ceph_metric_header header;
__le32 sec; struct ceph_timespec lat;
__le32 nsec; struct ceph_timespec avg;
__le64 sq_sum;
__le64 count;
} __packed; } __packed;
/* metric metadata latency header */ /* metric metadata latency header */
struct ceph_metric_metadata_latency { struct ceph_metric_metadata_latency {
struct ceph_metric_header header; struct ceph_metric_header header;
__le32 sec; struct ceph_timespec lat;
__le32 nsec; struct ceph_timespec avg;
__le64 sq_sum;
__le64 count;
} __packed; } __packed;
/* metric dentry lease header */ /* metric dentry lease header */
...@@ -140,6 +158,7 @@ struct ceph_metric { ...@@ -140,6 +158,7 @@ struct ceph_metric {
u64 size_min; u64 size_min;
u64 size_max; u64 size_max;
ktime_t latency_sum; ktime_t latency_sum;
ktime_t latency_avg;
ktime_t latency_sq_sum; ktime_t latency_sq_sum;
ktime_t latency_min; ktime_t latency_min;
ktime_t latency_max; ktime_t latency_max;
......
...@@ -121,18 +121,23 @@ static struct ceph_snap_realm *ceph_create_snap_realm( ...@@ -121,18 +121,23 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
if (!realm) if (!realm)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
atomic_set(&realm->nref, 1); /* for caller */ /* Do not release the global dummy snaprealm until unmouting */
if (ino == CEPH_INO_GLOBAL_SNAPREALM)
atomic_set(&realm->nref, 2);
else
atomic_set(&realm->nref, 1);
realm->ino = ino; realm->ino = ino;
INIT_LIST_HEAD(&realm->children); INIT_LIST_HEAD(&realm->children);
INIT_LIST_HEAD(&realm->child_item); INIT_LIST_HEAD(&realm->child_item);
INIT_LIST_HEAD(&realm->empty_item); INIT_LIST_HEAD(&realm->empty_item);
INIT_LIST_HEAD(&realm->dirty_item); INIT_LIST_HEAD(&realm->dirty_item);
INIT_LIST_HEAD(&realm->rebuild_item);
INIT_LIST_HEAD(&realm->inodes_with_caps); INIT_LIST_HEAD(&realm->inodes_with_caps);
spin_lock_init(&realm->inodes_with_caps_lock); spin_lock_init(&realm->inodes_with_caps_lock);
__insert_snap_realm(&mdsc->snap_realms, realm); __insert_snap_realm(&mdsc->snap_realms, realm);
mdsc->num_snap_realms++; mdsc->num_snap_realms++;
dout("create_snap_realm %llx %p\n", realm->ino, realm); dout("%s %llx %p\n", __func__, realm->ino, realm);
return realm; return realm;
} }
...@@ -156,7 +161,7 @@ static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc, ...@@ -156,7 +161,7 @@ static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
else if (ino > r->ino) else if (ino > r->ino)
n = n->rb_right; n = n->rb_right;
else { else {
dout("lookup_snap_realm %llx %p\n", r->ino, r); dout("%s %llx %p\n", __func__, r->ino, r);
return r; return r;
} }
} }
...@@ -184,7 +189,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc, ...@@ -184,7 +189,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
{ {
lockdep_assert_held_write(&mdsc->snap_rwsem); lockdep_assert_held_write(&mdsc->snap_rwsem);
dout("__destroy_snap_realm %p %llx\n", realm, realm->ino); dout("%s %p %llx\n", __func__, realm, realm->ino);
rb_erase(&realm->node, &mdsc->snap_realms); rb_erase(&realm->node, &mdsc->snap_realms);
mdsc->num_snap_realms--; mdsc->num_snap_realms--;
...@@ -260,9 +265,14 @@ static void __cleanup_empty_realms(struct ceph_mds_client *mdsc) ...@@ -260,9 +265,14 @@ static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
spin_unlock(&mdsc->snap_empty_lock); spin_unlock(&mdsc->snap_empty_lock);
} }
void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc) void ceph_cleanup_global_and_empty_realms(struct ceph_mds_client *mdsc)
{ {
struct ceph_snap_realm *global_realm;
down_write(&mdsc->snap_rwsem); down_write(&mdsc->snap_rwsem);
global_realm = __lookup_snap_realm(mdsc, CEPH_INO_GLOBAL_SNAPREALM);
if (global_realm)
ceph_put_snap_realm(mdsc, global_realm);
__cleanup_empty_realms(mdsc); __cleanup_empty_realms(mdsc);
up_write(&mdsc->snap_rwsem); up_write(&mdsc->snap_rwsem);
} }
...@@ -292,9 +302,8 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc, ...@@ -292,9 +302,8 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
if (IS_ERR(parent)) if (IS_ERR(parent))
return PTR_ERR(parent); return PTR_ERR(parent);
} }
dout("adjust_snap_realm_parent %llx %p: %llx %p -> %llx %p\n", dout("%s %llx %p: %llx %p -> %llx %p\n", __func__, realm->ino,
realm->ino, realm, realm->parent_ino, realm->parent, realm, realm->parent_ino, realm->parent, parentino, parent);
parentino, parent);
if (realm->parent) { if (realm->parent) {
list_del_init(&realm->child_item); list_del_init(&realm->child_item);
ceph_put_snap_realm(mdsc, realm->parent); ceph_put_snap_realm(mdsc, realm->parent);
...@@ -320,7 +329,8 @@ static int cmpu64_rev(const void *a, const void *b) ...@@ -320,7 +329,8 @@ static int cmpu64_rev(const void *a, const void *b)
* build the snap context for a given realm. * build the snap context for a given realm.
*/ */
static int build_snap_context(struct ceph_snap_realm *realm, static int build_snap_context(struct ceph_snap_realm *realm,
struct list_head* dirty_realms) struct list_head *realm_queue,
struct list_head *dirty_realms)
{ {
struct ceph_snap_realm *parent = realm->parent; struct ceph_snap_realm *parent = realm->parent;
struct ceph_snap_context *snapc; struct ceph_snap_context *snapc;
...@@ -334,9 +344,9 @@ static int build_snap_context(struct ceph_snap_realm *realm, ...@@ -334,9 +344,9 @@ static int build_snap_context(struct ceph_snap_realm *realm,
*/ */
if (parent) { if (parent) {
if (!parent->cached_context) { if (!parent->cached_context) {
err = build_snap_context(parent, dirty_realms); /* add to the queue head */
if (err) list_add(&parent->rebuild_item, realm_queue);
goto fail; return 1;
} }
num += parent->cached_context->num_snaps; num += parent->cached_context->num_snaps;
} }
...@@ -349,9 +359,8 @@ static int build_snap_context(struct ceph_snap_realm *realm, ...@@ -349,9 +359,8 @@ static int build_snap_context(struct ceph_snap_realm *realm,
realm->cached_context->seq == realm->seq && realm->cached_context->seq == realm->seq &&
(!parent || (!parent ||
realm->cached_context->seq >= parent->cached_context->seq)) { realm->cached_context->seq >= parent->cached_context->seq)) {
dout("build_snap_context %llx %p: %p seq %lld (%u snaps)" dout("%s %llx %p: %p seq %lld (%u snaps) (unchanged)\n",
" (unchanged)\n", __func__, realm->ino, realm, realm->cached_context,
realm->ino, realm, realm->cached_context,
realm->cached_context->seq, realm->cached_context->seq,
(unsigned int)realm->cached_context->num_snaps); (unsigned int)realm->cached_context->num_snaps);
return 0; return 0;
...@@ -390,9 +399,8 @@ static int build_snap_context(struct ceph_snap_realm *realm, ...@@ -390,9 +399,8 @@ static int build_snap_context(struct ceph_snap_realm *realm,
sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL); sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
snapc->num_snaps = num; snapc->num_snaps = num;
dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n", dout("%s %llx %p: %p seq %lld (%u snaps)\n", __func__, realm->ino,
realm->ino, realm, snapc, snapc->seq, realm, snapc, snapc->seq, (unsigned int) snapc->num_snaps);
(unsigned int) snapc->num_snaps);
ceph_put_snap_context(realm->cached_context); ceph_put_snap_context(realm->cached_context);
realm->cached_context = snapc; realm->cached_context = snapc;
...@@ -409,8 +417,7 @@ static int build_snap_context(struct ceph_snap_realm *realm, ...@@ -409,8 +417,7 @@ static int build_snap_context(struct ceph_snap_realm *realm,
ceph_put_snap_context(realm->cached_context); ceph_put_snap_context(realm->cached_context);
realm->cached_context = NULL; realm->cached_context = NULL;
} }
pr_err("build_snap_context %llx %p fail %d\n", realm->ino, pr_err("%s %llx %p fail %d\n", __func__, realm->ino, realm, err);
realm, err);
return err; return err;
} }
...@@ -420,13 +427,50 @@ static int build_snap_context(struct ceph_snap_realm *realm, ...@@ -420,13 +427,50 @@ static int build_snap_context(struct ceph_snap_realm *realm,
static void rebuild_snap_realms(struct ceph_snap_realm *realm, static void rebuild_snap_realms(struct ceph_snap_realm *realm,
struct list_head *dirty_realms) struct list_head *dirty_realms)
{ {
struct ceph_snap_realm *child; LIST_HEAD(realm_queue);
int last = 0;
bool skip = false;
list_add_tail(&realm->rebuild_item, &realm_queue);
while (!list_empty(&realm_queue)) {
struct ceph_snap_realm *_realm, *child;
_realm = list_first_entry(&realm_queue,
struct ceph_snap_realm,
rebuild_item);
/*
* If the last building failed dues to memory
* issue, just empty the realm_queue and return
* to avoid infinite loop.
*/
if (last < 0) {
list_del_init(&_realm->rebuild_item);
continue;
}
last = build_snap_context(_realm, &realm_queue, dirty_realms);
dout("%s %llx %p, %s\n", __func__, _realm->ino, _realm,
last > 0 ? "is deferred" : !last ? "succeeded" : "failed");
/* is any child in the list ? */
list_for_each_entry(child, &_realm->children, child_item) {
if (!list_empty(&child->rebuild_item)) {
skip = true;
break;
}
}
dout("rebuild_snap_realms %llx %p\n", realm->ino, realm); if (!skip) {
build_snap_context(realm, dirty_realms); list_for_each_entry(child, &_realm->children, child_item)
list_add_tail(&child->rebuild_item, &realm_queue);
}
list_for_each_entry(child, &realm->children, child_item) /* last == 1 means need to build parent first */
rebuild_snap_realms(child, dirty_realms); if (last <= 0)
list_del_init(&_realm->rebuild_item);
}
} }
...@@ -474,23 +518,15 @@ static bool has_new_snaps(struct ceph_snap_context *o, ...@@ -474,23 +518,15 @@ static bool has_new_snaps(struct ceph_snap_context *o,
* Caller must hold snap_rwsem for read (i.e., the realm topology won't * Caller must hold snap_rwsem for read (i.e., the realm topology won't
* change). * change).
*/ */
static void ceph_queue_cap_snap(struct ceph_inode_info *ci) static void ceph_queue_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap **pcapsnap)
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap;
struct ceph_snap_context *old_snapc, *new_snapc; struct ceph_snap_context *old_snapc, *new_snapc;
struct ceph_cap_snap *capsnap = *pcapsnap;
struct ceph_buffer *old_blob = NULL; struct ceph_buffer *old_blob = NULL;
int used, dirty; int used, dirty;
capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
if (!capsnap) {
pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode);
return;
}
capsnap->cap_flush.is_capsnap = true;
INIT_LIST_HEAD(&capsnap->cap_flush.i_list);
INIT_LIST_HEAD(&capsnap->cap_flush.g_list);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
used = __ceph_caps_used(ci); used = __ceph_caps_used(ci);
dirty = __ceph_caps_dirty(ci); dirty = __ceph_caps_dirty(ci);
...@@ -511,12 +547,14 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -511,12 +547,14 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci)
as no new writes are allowed to start when pending, so any as no new writes are allowed to start when pending, so any
writes in progress now were started before the previous writes in progress now were started before the previous
cap_snap. lucky us. */ cap_snap. lucky us. */
dout("queue_cap_snap %p already pending\n", inode); dout("%s %p %llx.%llx already pending\n",
__func__, inode, ceph_vinop(inode));
goto update_snapc; goto update_snapc;
} }
if (ci->i_wrbuffer_ref_head == 0 && if (ci->i_wrbuffer_ref_head == 0 &&
!(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) { !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) {
dout("queue_cap_snap %p nothing dirty|writing\n", inode); dout("%s %p %llx.%llx nothing dirty|writing\n",
__func__, inode, ceph_vinop(inode));
goto update_snapc; goto update_snapc;
} }
...@@ -536,20 +574,17 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -536,20 +574,17 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci)
} else { } else {
if (!(used & CEPH_CAP_FILE_WR) && if (!(used & CEPH_CAP_FILE_WR) &&
ci->i_wrbuffer_ref_head == 0) { ci->i_wrbuffer_ref_head == 0) {
dout("queue_cap_snap %p " dout("%s %p %llx.%llx no new_snap|dirty_page|writing\n",
"no new_snap|dirty_page|writing\n", inode); __func__, inode, ceph_vinop(inode));
goto update_snapc; goto update_snapc;
} }
} }
dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n", dout("%s %p %llx.%llx cap_snap %p queuing under %p %s %s\n",
inode, capsnap, old_snapc, ceph_cap_string(dirty), __func__, inode, ceph_vinop(inode), capsnap, old_snapc,
capsnap->need_flush ? "" : "no_flush"); ceph_cap_string(dirty), capsnap->need_flush ? "" : "no_flush");
ihold(inode); ihold(inode);
refcount_set(&capsnap->nref, 1);
INIT_LIST_HEAD(&capsnap->ci_item);
capsnap->follows = old_snapc->seq; capsnap->follows = old_snapc->seq;
capsnap->issued = __ceph_caps_issued(ci, NULL); capsnap->issued = __ceph_caps_issued(ci, NULL);
capsnap->dirty = dirty; capsnap->dirty = dirty;
...@@ -579,31 +614,30 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -579,31 +614,30 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci)
list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
if (used & CEPH_CAP_FILE_WR) { if (used & CEPH_CAP_FILE_WR) {
dout("queue_cap_snap %p cap_snap %p snapc %p" dout("%s %p %llx.%llx cap_snap %p snapc %p seq %llu used WR,"
" seq %llu used WR, now pending\n", inode, " now pending\n", __func__, inode, ceph_vinop(inode),
capsnap, old_snapc, old_snapc->seq); capsnap, old_snapc, old_snapc->seq);
capsnap->writing = 1; capsnap->writing = 1;
} else { } else {
/* note mtime, size NOW. */ /* note mtime, size NOW. */
__ceph_finish_cap_snap(ci, capsnap); __ceph_finish_cap_snap(ci, capsnap);
} }
capsnap = NULL; *pcapsnap = NULL;
old_snapc = NULL; old_snapc = NULL;
update_snapc: update_snapc:
if (ci->i_wrbuffer_ref_head == 0 && if (ci->i_wrbuffer_ref_head == 0 &&
ci->i_wr_ref == 0 && ci->i_wr_ref == 0 &&
ci->i_dirty_caps == 0 && ci->i_dirty_caps == 0 &&
ci->i_flushing_caps == 0) { ci->i_flushing_caps == 0) {
ci->i_head_snapc = NULL; ci->i_head_snapc = NULL;
} else { } else {
ci->i_head_snapc = ceph_get_snap_context(new_snapc); ci->i_head_snapc = ceph_get_snap_context(new_snapc);
dout(" new snapc is %p\n", new_snapc); dout(" new snapc is %p\n", new_snapc);
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ceph_buffer_put(old_blob); ceph_buffer_put(old_blob);
kfree(capsnap);
ceph_put_snap_context(old_snapc); ceph_put_snap_context(old_snapc);
} }
...@@ -632,27 +666,28 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, ...@@ -632,27 +666,28 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->truncate_size = ci->i_truncate_size; capsnap->truncate_size = ci->i_truncate_size;
capsnap->truncate_seq = ci->i_truncate_seq; capsnap->truncate_seq = ci->i_truncate_seq;
if (capsnap->dirty_pages) { if (capsnap->dirty_pages) {
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu " dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu "
"still has %d dirty pages\n", inode, capsnap, "still has %d dirty pages\n", __func__, inode,
capsnap->context, capsnap->context->seq, ceph_vinop(inode), capsnap, capsnap->context,
ceph_cap_string(capsnap->dirty), capsnap->size, capsnap->context->seq, ceph_cap_string(capsnap->dirty),
capsnap->dirty_pages); capsnap->size, capsnap->dirty_pages);
return 0; return 0;
} }
/* Fb cap still in use, delay it */ /* Fb cap still in use, delay it */
if (ci->i_wb_ref) { if (ci->i_wb_ref) {
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu " dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu "
"used WRBUFFER, delaying\n", inode, capsnap, "used WRBUFFER, delaying\n", __func__, inode,
capsnap->context, capsnap->context->seq, ceph_vinop(inode), capsnap, capsnap->context,
ceph_cap_string(capsnap->dirty), capsnap->size); capsnap->context->seq, ceph_cap_string(capsnap->dirty),
capsnap->size);
capsnap->writing = 1; capsnap->writing = 1;
return 0; return 0;
} }
ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n", dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu\n",
inode, capsnap, capsnap->context, __func__, inode, ceph_vinop(inode), capsnap, capsnap->context,
capsnap->context->seq, ceph_cap_string(capsnap->dirty), capsnap->context->seq, ceph_cap_string(capsnap->dirty),
capsnap->size); capsnap->size);
...@@ -671,8 +706,9 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) ...@@ -671,8 +706,9 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
{ {
struct ceph_inode_info *ci; struct ceph_inode_info *ci;
struct inode *lastinode = NULL; struct inode *lastinode = NULL;
struct ceph_cap_snap *capsnap = NULL;
dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino); dout("%s %p %llx inode\n", __func__, realm, realm->ino);
spin_lock(&realm->inodes_with_caps_lock); spin_lock(&realm->inodes_with_caps_lock);
list_for_each_entry(ci, &realm->inodes_with_caps, i_snap_realm_item) { list_for_each_entry(ci, &realm->inodes_with_caps, i_snap_realm_item) {
...@@ -682,13 +718,35 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) ...@@ -682,13 +718,35 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
iput(lastinode); iput(lastinode);
lastinode = inode; lastinode = inode;
ceph_queue_cap_snap(ci);
/*
* Allocate the capsnap memory outside of ceph_queue_cap_snap()
* to reduce very possible but unnecessary frequently memory
* allocate/free in this loop.
*/
if (!capsnap) {
capsnap = kmem_cache_zalloc(ceph_cap_snap_cachep, GFP_NOFS);
if (!capsnap) {
pr_err("ENOMEM allocating ceph_cap_snap on %p\n",
inode);
return;
}
}
capsnap->cap_flush.is_capsnap = true;
refcount_set(&capsnap->nref, 1);
INIT_LIST_HEAD(&capsnap->cap_flush.i_list);
INIT_LIST_HEAD(&capsnap->cap_flush.g_list);
INIT_LIST_HEAD(&capsnap->ci_item);
ceph_queue_cap_snap(ci, &capsnap);
spin_lock(&realm->inodes_with_caps_lock); spin_lock(&realm->inodes_with_caps_lock);
} }
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
iput(lastinode); iput(lastinode);
dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino); if (capsnap)
kmem_cache_free(ceph_cap_snap_cachep, capsnap);
dout("%s %p %llx done\n", __func__, realm, realm->ino);
} }
/* /*
...@@ -707,14 +765,16 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ...@@ -707,14 +765,16 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
__le64 *prior_parent_snaps; /* encoded */ __le64 *prior_parent_snaps; /* encoded */
struct ceph_snap_realm *realm = NULL; struct ceph_snap_realm *realm = NULL;
struct ceph_snap_realm *first_realm = NULL; struct ceph_snap_realm *first_realm = NULL;
int invalidate = 0; struct ceph_snap_realm *realm_to_rebuild = NULL;
int rebuild_snapcs;
int err = -ENOMEM; int err = -ENOMEM;
LIST_HEAD(dirty_realms); LIST_HEAD(dirty_realms);
lockdep_assert_held_write(&mdsc->snap_rwsem); lockdep_assert_held_write(&mdsc->snap_rwsem);
dout("update_snap_trace deletion=%d\n", deletion); dout("%s deletion=%d\n", __func__, deletion);
more: more:
rebuild_snapcs = 0;
ceph_decode_need(&p, e, sizeof(*ri), bad); ceph_decode_need(&p, e, sizeof(*ri), bad);
ri = p; ri = p;
p += sizeof(*ri); p += sizeof(*ri);
...@@ -738,10 +798,10 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ...@@ -738,10 +798,10 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent)); err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
if (err < 0) if (err < 0)
goto fail; goto fail;
invalidate += err; rebuild_snapcs += err;
if (le64_to_cpu(ri->seq) > realm->seq) { if (le64_to_cpu(ri->seq) > realm->seq) {
dout("update_snap_trace updating %llx %p %lld -> %lld\n", dout("%s updating %llx %p %lld -> %lld\n", __func__,
realm->ino, realm, realm->seq, le64_to_cpu(ri->seq)); realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
/* update realm parameters, snap lists */ /* update realm parameters, snap lists */
realm->seq = le64_to_cpu(ri->seq); realm->seq = le64_to_cpu(ri->seq);
...@@ -763,22 +823,30 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ...@@ -763,22 +823,30 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
if (realm->seq > mdsc->last_snap_seq) if (realm->seq > mdsc->last_snap_seq)
mdsc->last_snap_seq = realm->seq; mdsc->last_snap_seq = realm->seq;
invalidate = 1; rebuild_snapcs = 1;
} else if (!realm->cached_context) { } else if (!realm->cached_context) {
dout("update_snap_trace %llx %p seq %lld new\n", dout("%s %llx %p seq %lld new\n", __func__,
realm->ino, realm, realm->seq); realm->ino, realm, realm->seq);
invalidate = 1; rebuild_snapcs = 1;
} else { } else {
dout("update_snap_trace %llx %p seq %lld unchanged\n", dout("%s %llx %p seq %lld unchanged\n", __func__,
realm->ino, realm, realm->seq); realm->ino, realm, realm->seq);
} }
dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, dout("done with %llx %p, rebuild_snapcs=%d, %p %p\n", realm->ino,
realm, invalidate, p, e); realm, rebuild_snapcs, p, e);
/*
* this will always track the uppest parent realm from which
* we need to rebuild the snapshot contexts _downward_ in
* hierarchy.
*/
if (rebuild_snapcs)
realm_to_rebuild = realm;
/* invalidate when we reach the _end_ (root) of the trace */ /* rebuild_snapcs when we reach the _end_ (root) of the trace */
if (invalidate && p >= e) if (realm_to_rebuild && p >= e)
rebuild_snap_realms(realm, &dirty_realms); rebuild_snap_realms(realm_to_rebuild, &dirty_realms);
if (!first_realm) if (!first_realm)
first_realm = realm; first_realm = realm;
...@@ -814,7 +882,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ...@@ -814,7 +882,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
ceph_put_snap_realm(mdsc, realm); ceph_put_snap_realm(mdsc, realm);
if (first_realm) if (first_realm)
ceph_put_snap_realm(mdsc, first_realm); ceph_put_snap_realm(mdsc, first_realm);
pr_err("update_snap_trace error %d\n", err); pr_err("%s error %d\n", __func__, err);
return err; return err;
} }
...@@ -831,7 +899,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ...@@ -831,7 +899,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
struct inode *inode; struct inode *inode;
struct ceph_mds_session *session = NULL; struct ceph_mds_session *session = NULL;
dout("flush_snaps\n"); dout("%s\n", __func__);
spin_lock(&mdsc->snap_flush_lock); spin_lock(&mdsc->snap_flush_lock);
while (!list_empty(&mdsc->snap_flush_list)) { while (!list_empty(&mdsc->snap_flush_list)) {
ci = list_first_entry(&mdsc->snap_flush_list, ci = list_first_entry(&mdsc->snap_flush_list,
...@@ -846,7 +914,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ...@@ -846,7 +914,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
spin_unlock(&mdsc->snap_flush_lock); spin_unlock(&mdsc->snap_flush_lock);
ceph_put_mds_session(session); ceph_put_mds_session(session);
dout("flush_snaps done\n"); dout("%s done\n", __func__);
} }
/** /**
...@@ -928,8 +996,8 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -928,8 +996,8 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
trace_len = le32_to_cpu(h->trace_len); trace_len = le32_to_cpu(h->trace_len);
p += sizeof(*h); p += sizeof(*h);
dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds, dout("%s from mds%d op %s split %llx tracelen %d\n", __func__,
ceph_snap_op_name(op), split, trace_len); mds, ceph_snap_op_name(op), split, trace_len);
mutex_lock(&session->s_mutex); mutex_lock(&session->s_mutex);
inc_session_sequence(session); inc_session_sequence(session);
...@@ -989,13 +1057,13 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -989,13 +1057,13 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
*/ */
if (ci->i_snap_realm->created > if (ci->i_snap_realm->created >
le64_to_cpu(ri->created)) { le64_to_cpu(ri->created)) {
dout(" leaving %p in newer realm %llx %p\n", dout(" leaving %p %llx.%llx in newer realm %llx %p\n",
inode, ci->i_snap_realm->ino, inode, ceph_vinop(inode), ci->i_snap_realm->ino,
ci->i_snap_realm); ci->i_snap_realm);
goto skip_inode; goto skip_inode;
} }
dout(" will move %p to split realm %llx %p\n", dout(" will move %p %llx.%llx to split realm %llx %p\n",
inode, realm->ino, realm); inode, ceph_vinop(inode), realm->ino, realm);
ceph_get_snap_realm(mdsc, realm); ceph_get_snap_realm(mdsc, realm);
ceph_change_snap_realm(inode, realm); ceph_change_snap_realm(inode, realm);
...@@ -1038,7 +1106,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -1038,7 +1106,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
return; return;
bad: bad:
pr_err("corrupt snap message from mds%d\n", mds); pr_err("%s corrupt snap message from mds%d\n", __func__, mds);
ceph_msg_dump(msg); ceph_msg_dump(msg);
out: out:
if (locked_rwsem) if (locked_rwsem)
...@@ -1071,7 +1139,8 @@ struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc, ...@@ -1071,7 +1139,8 @@ struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
} }
spin_unlock(&mdsc->snapid_map_lock); spin_unlock(&mdsc->snapid_map_lock);
if (exist) { if (exist) {
dout("found snapid map %llx -> %x\n", exist->snap, exist->dev); dout("%s found snapid map %llx -> %x\n", __func__,
exist->snap, exist->dev);
return exist; return exist;
} }
...@@ -1115,11 +1184,13 @@ struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc, ...@@ -1115,11 +1184,13 @@ struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
if (exist) { if (exist) {
free_anon_bdev(sm->dev); free_anon_bdev(sm->dev);
kfree(sm); kfree(sm);
dout("found snapid map %llx -> %x\n", exist->snap, exist->dev); dout("%s found snapid map %llx -> %x\n", __func__,
exist->snap, exist->dev);
return exist; return exist;
} }
dout("create snapid map %llx -> %x\n", sm->snap, sm->dev); dout("%s create snapid map %llx -> %x\n", __func__,
sm->snap, sm->dev);
return sm; return sm;
} }
......
...@@ -60,6 +60,7 @@ const char *ceph_mds_op_name(int op) ...@@ -60,6 +60,7 @@ const char *ceph_mds_op_name(int op)
case CEPH_MDS_OP_LOOKUPINO: return "lookupino"; case CEPH_MDS_OP_LOOKUPINO: return "lookupino";
case CEPH_MDS_OP_LOOKUPNAME: return "lookupname"; case CEPH_MDS_OP_LOOKUPNAME: return "lookupname";
case CEPH_MDS_OP_GETATTR: return "getattr"; case CEPH_MDS_OP_GETATTR: return "getattr";
case CEPH_MDS_OP_GETVXATTR: return "getvxattr";
case CEPH_MDS_OP_SETXATTR: return "setxattr"; case CEPH_MDS_OP_SETXATTR: return "setxattr";
case CEPH_MDS_OP_SETATTR: return "setattr"; case CEPH_MDS_OP_SETATTR: return "setattr";
case CEPH_MDS_OP_RMXATTR: return "rmxattr"; case CEPH_MDS_OP_RMXATTR: return "rmxattr";
......
...@@ -865,6 +865,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) ...@@ -865,6 +865,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
*/ */
struct kmem_cache *ceph_inode_cachep; struct kmem_cache *ceph_inode_cachep;
struct kmem_cache *ceph_cap_cachep; struct kmem_cache *ceph_cap_cachep;
struct kmem_cache *ceph_cap_snap_cachep;
struct kmem_cache *ceph_cap_flush_cachep; struct kmem_cache *ceph_cap_flush_cachep;
struct kmem_cache *ceph_dentry_cachep; struct kmem_cache *ceph_dentry_cachep;
struct kmem_cache *ceph_file_cachep; struct kmem_cache *ceph_file_cachep;
...@@ -893,6 +894,9 @@ static int __init init_caches(void) ...@@ -893,6 +894,9 @@ static int __init init_caches(void)
ceph_cap_cachep = KMEM_CACHE(ceph_cap, SLAB_MEM_SPREAD); ceph_cap_cachep = KMEM_CACHE(ceph_cap, SLAB_MEM_SPREAD);
if (!ceph_cap_cachep) if (!ceph_cap_cachep)
goto bad_cap; goto bad_cap;
ceph_cap_snap_cachep = KMEM_CACHE(ceph_cap_snap, SLAB_MEM_SPREAD);
if (!ceph_cap_snap_cachep)
goto bad_cap_snap;
ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush, ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush,
SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
if (!ceph_cap_flush_cachep) if (!ceph_cap_flush_cachep)
...@@ -932,6 +936,8 @@ static int __init init_caches(void) ...@@ -932,6 +936,8 @@ static int __init init_caches(void)
bad_dentry: bad_dentry:
kmem_cache_destroy(ceph_cap_flush_cachep); kmem_cache_destroy(ceph_cap_flush_cachep);
bad_cap_flush: bad_cap_flush:
kmem_cache_destroy(ceph_cap_snap_cachep);
bad_cap_snap:
kmem_cache_destroy(ceph_cap_cachep); kmem_cache_destroy(ceph_cap_cachep);
bad_cap: bad_cap:
kmem_cache_destroy(ceph_inode_cachep); kmem_cache_destroy(ceph_inode_cachep);
...@@ -948,6 +954,7 @@ static void destroy_caches(void) ...@@ -948,6 +954,7 @@ static void destroy_caches(void)
kmem_cache_destroy(ceph_inode_cachep); kmem_cache_destroy(ceph_inode_cachep);
kmem_cache_destroy(ceph_cap_cachep); kmem_cache_destroy(ceph_cap_cachep);
kmem_cache_destroy(ceph_cap_snap_cachep);
kmem_cache_destroy(ceph_cap_flush_cachep); kmem_cache_destroy(ceph_cap_flush_cachep);
kmem_cache_destroy(ceph_dentry_cachep); kmem_cache_destroy(ceph_dentry_cachep);
kmem_cache_destroy(ceph_file_cachep); kmem_cache_destroy(ceph_file_cachep);
......
...@@ -231,7 +231,7 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) ...@@ -231,7 +231,7 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
if (refcount_dec_and_test(&capsnap->nref)) { if (refcount_dec_and_test(&capsnap->nref)) {
if (capsnap->xattr_blob) if (capsnap->xattr_blob)
ceph_buffer_put(capsnap->xattr_blob); ceph_buffer_put(capsnap->xattr_blob);
kfree(capsnap); kmem_cache_free(ceph_cap_snap_cachep, capsnap);
} }
} }
...@@ -884,6 +884,8 @@ struct ceph_snap_realm { ...@@ -884,6 +884,8 @@ struct ceph_snap_realm {
struct list_head dirty_item; /* if realm needs new context */ struct list_head dirty_item; /* if realm needs new context */
struct list_head rebuild_item; /* rebuild snap realms _downward_ in hierarchy */
/* the current set of snaps for this realm */ /* the current set of snaps for this realm */
struct ceph_snap_context *cached_context; struct ceph_snap_context *cached_context;
...@@ -939,7 +941,7 @@ extern void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -939,7 +941,7 @@ extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_msg *msg); struct ceph_msg *msg);
extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap); struct ceph_cap_snap *capsnap);
extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); extern void ceph_cleanup_global_and_empty_realms(struct ceph_mds_client *mdsc);
extern struct ceph_snapid_map *ceph_get_snapid_map(struct ceph_mds_client *mdsc, extern struct ceph_snapid_map *ceph_get_snapid_map(struct ceph_mds_client *mdsc,
u64 snap); u64 snap);
...@@ -1049,6 +1051,7 @@ static inline bool ceph_inode_is_shutdown(struct inode *inode) ...@@ -1049,6 +1051,7 @@ static inline bool ceph_inode_is_shutdown(struct inode *inode)
/* xattr.c */ /* xattr.c */
int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int); int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int);
int ceph_do_getvxattr(struct inode *inode, const char *name, void *value, size_t size);
ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t); ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t);
extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
extern struct ceph_buffer *__ceph_build_xattrs_blob(struct ceph_inode_info *ci); extern struct ceph_buffer *__ceph_build_xattrs_blob(struct ceph_inode_info *ci);
...@@ -1214,7 +1217,7 @@ extern void __ceph_touch_fmode(struct ceph_inode_info *ci, ...@@ -1214,7 +1217,7 @@ extern void __ceph_touch_fmode(struct ceph_inode_info *ci,
/* addr.c */ /* addr.c */
extern const struct address_space_operations ceph_aops; extern const struct address_space_operations ceph_aops;
extern int ceph_mmap(struct file *file, struct vm_area_struct *vma); extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
extern int ceph_uninline_data(struct file *filp, struct page *locked_page); extern int ceph_uninline_data(struct file *file);
extern int ceph_pool_perm_check(struct inode *inode, int need); extern int ceph_pool_perm_check(struct inode *inode, int need);
extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc); extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate); int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate);
......
...@@ -923,10 +923,13 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, ...@@ -923,10 +923,13 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_inode_xattr *xattr; struct ceph_inode_xattr *xattr;
struct ceph_vxattr *vxattr = NULL; struct ceph_vxattr *vxattr;
int req_mask; int req_mask;
ssize_t err; ssize_t err;
if (strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN))
goto handle_non_vxattrs;
/* let's see if a virtual xattr was requested */ /* let's see if a virtual xattr was requested */
vxattr = ceph_match_vxattr(inode, name); vxattr = ceph_match_vxattr(inode, name);
if (vxattr) { if (vxattr) {
...@@ -945,8 +948,14 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, ...@@ -945,8 +948,14 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
err = -ERANGE; err = -ERANGE;
} }
return err; return err;
} else {
err = ceph_do_getvxattr(inode, name, value, size);
/* this would happen with a new client and old server combo */
if (err == -EOPNOTSUPP)
err = -ENODATA;
return err;
} }
handle_non_vxattrs:
req_mask = __get_request_mask(inode); req_mask = __get_request_mask(inode);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
......
...@@ -28,8 +28,8 @@ ...@@ -28,8 +28,8 @@
#define CEPH_INO_ROOT 1 #define CEPH_INO_ROOT 1
#define CEPH_INO_CEPH 2 /* hidden .ceph dir */ #define CEPH_INO_CEPH 2 /* hidden .ceph dir */
#define CEPH_INO_DOTDOT 3 /* used by ceph fuse for parent (..) */ #define CEPH_INO_GLOBAL_SNAPREALM 3 /* global dummy snaprealm */
/* arbitrary limit on max # of monitors (cluster of 3 is typical) */ /* arbitrary limit on max # of monitors (cluster of 3 is typical) */
#define CEPH_MAX_MON 31 #define CEPH_MAX_MON 31
...@@ -328,6 +328,7 @@ enum { ...@@ -328,6 +328,7 @@ enum {
CEPH_MDS_OP_LOOKUPPARENT = 0x00103, CEPH_MDS_OP_LOOKUPPARENT = 0x00103,
CEPH_MDS_OP_LOOKUPINO = 0x00104, CEPH_MDS_OP_LOOKUPINO = 0x00104,
CEPH_MDS_OP_LOOKUPNAME = 0x00105, CEPH_MDS_OP_LOOKUPNAME = 0x00105,
CEPH_MDS_OP_GETVXATTR = 0x00106,
CEPH_MDS_OP_SETXATTR = 0x01105, CEPH_MDS_OP_SETXATTR = 0x01105,
CEPH_MDS_OP_RMXATTR = 0x01106, CEPH_MDS_OP_RMXATTR = 0x01106,
......
...@@ -284,6 +284,7 @@ DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld) ...@@ -284,6 +284,7 @@ DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld)
extern struct kmem_cache *ceph_inode_cachep; extern struct kmem_cache *ceph_inode_cachep;
extern struct kmem_cache *ceph_cap_cachep; extern struct kmem_cache *ceph_cap_cachep;
extern struct kmem_cache *ceph_cap_snap_cachep;
extern struct kmem_cache *ceph_cap_flush_cachep; extern struct kmem_cache *ceph_cap_flush_cachep;
extern struct kmem_cache *ceph_dentry_cachep; extern struct kmem_cache *ceph_dentry_cachep;
extern struct kmem_cache *ceph_file_cachep; extern struct kmem_cache *ceph_file_cachep;
......
...@@ -1773,10 +1773,8 @@ static int prepare_read_data(struct ceph_connection *con) ...@@ -1773,10 +1773,8 @@ static int prepare_read_data(struct ceph_connection *con)
bv.bv_page = con->bounce_page; bv.bv_page = con->bounce_page;
bv.bv_offset = 0; bv.bv_offset = 0;
set_in_bvec(con, &bv);
} else {
set_in_bvec(con, &bv);
} }
set_in_bvec(con, &bv);
con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT; con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT;
return 0; return 0;
} }
...@@ -1807,10 +1805,8 @@ static void prepare_read_data_cont(struct ceph_connection *con) ...@@ -1807,10 +1805,8 @@ static void prepare_read_data_cont(struct ceph_connection *con)
if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
bv.bv_page = con->bounce_page; bv.bv_page = con->bounce_page;
bv.bv_offset = 0; bv.bv_offset = 0;
set_in_bvec(con, &bv);
} else {
set_in_bvec(con, &bv);
} }
set_in_bvec(con, &bv);
WARN_ON(con->v2.in_state != IN_S_PREPARE_READ_DATA_CONT); WARN_ON(con->v2.in_state != IN_S_PREPARE_READ_DATA_CONT);
return; return;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment