Commit 47a7ce62 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.14-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "We have new filesystem client metrics for reporting I/O sizes from
  Xiubo, two patchsets from Jeff that begin to untangle some heavyweight
  blocking locks in the filesystem and a bunch of code cleanups"

* tag 'ceph-for-5.14-rc1' of git://github.com/ceph/ceph-client:
  ceph: take reference to req->r_parent at point of assignment
  ceph: eliminate ceph_async_iput()
  ceph: don't take s_mutex in ceph_flush_snaps
  ceph: don't take s_mutex in try_flush_caps
  ceph: don't take s_mutex or snap_rwsem in ceph_check_caps
  ceph: eliminate session->s_gen_ttl_lock
  ceph: allow ceph_put_mds_session to take NULL or ERR_PTR
  ceph: clean up locking annotation for ceph_get_snap_realm and __lookup_snap_realm
  ceph: add some lockdep assertions around snaprealm handling
  ceph: decoding error in ceph_update_snap_realm should return -EIO
  ceph: add IO size metrics support
  ceph: update and rename __update_latency helper to __update_stdev
  ceph: simplify the metrics struct
  libceph: fix doc warnings in cls_lock_client.c
  libceph: remove unnecessary ret variable in ceph_auth_init()
  libceph: fix some spelling mistakes
  libceph: kill ceph_none_authorizer::reply_buf
  ceph: make ceph_queue_cap_snap static
  ceph: make ceph_netfs_read_ops static
  ceph: remove bogus checks and WARN_ONs from ceph_set_page_dirty
parents 96890bc2 4c183472
...@@ -82,10 +82,6 @@ static int ceph_set_page_dirty(struct page *page) ...@@ -82,10 +82,6 @@ static int ceph_set_page_dirty(struct page *page)
struct inode *inode; struct inode *inode;
struct ceph_inode_info *ci; struct ceph_inode_info *ci;
struct ceph_snap_context *snapc; struct ceph_snap_context *snapc;
int ret;
if (unlikely(!mapping))
return !TestSetPageDirty(page);
if (PageDirty(page)) { if (PageDirty(page)) {
dout("%p set_page_dirty %p idx %lu -- already dirty\n", dout("%p set_page_dirty %p idx %lu -- already dirty\n",
...@@ -130,11 +126,7 @@ static int ceph_set_page_dirty(struct page *page) ...@@ -130,11 +126,7 @@ static int ceph_set_page_dirty(struct page *page)
BUG_ON(PagePrivate(page)); BUG_ON(PagePrivate(page));
attach_page_private(page, snapc); attach_page_private(page, snapc);
ret = __set_page_dirty_nobuffers(page); return __set_page_dirty_nobuffers(page);
WARN_ON(!PageLocked(page));
WARN_ON(!page->mapping);
return ret;
} }
/* /*
...@@ -226,7 +218,7 @@ static void finish_netfs_read(struct ceph_osd_request *req) ...@@ -226,7 +218,7 @@ static void finish_netfs_read(struct ceph_osd_request *req)
int err = req->r_result; int err = req->r_result;
ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency, ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, err); req->r_end_latency, osd_data->length, err);
dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result, dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result,
subreq->len, i_size_read(req->r_inode)); subreq->len, i_size_read(req->r_inode));
...@@ -313,7 +305,7 @@ static void ceph_readahead_cleanup(struct address_space *mapping, void *priv) ...@@ -313,7 +305,7 @@ static void ceph_readahead_cleanup(struct address_space *mapping, void *priv)
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
} }
const struct netfs_read_request_ops ceph_netfs_read_ops = { static const struct netfs_read_request_ops ceph_netfs_read_ops = {
.init_rreq = ceph_init_rreq, .init_rreq = ceph_init_rreq,
.is_cache_enabled = ceph_is_cache_enabled, .is_cache_enabled = ceph_is_cache_enabled,
.begin_cache_operation = ceph_begin_cache_operation, .begin_cache_operation = ceph_begin_cache_operation,
...@@ -560,7 +552,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) ...@@ -560,7 +552,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
err = ceph_osdc_wait_request(osdc, req); err = ceph_osdc_wait_request(osdc, req);
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, err); req->r_end_latency, len, err);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (err == 0) if (err == 0)
...@@ -635,6 +627,7 @@ static void writepages_finish(struct ceph_osd_request *req) ...@@ -635,6 +627,7 @@ static void writepages_finish(struct ceph_osd_request *req)
struct ceph_snap_context *snapc = req->r_snapc; struct ceph_snap_context *snapc = req->r_snapc;
struct address_space *mapping = inode->i_mapping; struct address_space *mapping = inode->i_mapping;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
unsigned int len = 0;
bool remove_page; bool remove_page;
dout("writepages_finish %p rc %d\n", inode, rc); dout("writepages_finish %p rc %d\n", inode, rc);
...@@ -647,9 +640,6 @@ static void writepages_finish(struct ceph_osd_request *req) ...@@ -647,9 +640,6 @@ static void writepages_finish(struct ceph_osd_request *req)
ceph_clear_error_write(ci); ceph_clear_error_write(ci);
} }
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, rc);
/* /*
* We lost the cache cap, need to truncate the page before * We lost the cache cap, need to truncate the page before
* it is unlocked, otherwise we'd truncate it later in the * it is unlocked, otherwise we'd truncate it later in the
...@@ -666,6 +656,7 @@ static void writepages_finish(struct ceph_osd_request *req) ...@@ -666,6 +656,7 @@ static void writepages_finish(struct ceph_osd_request *req)
osd_data = osd_req_op_extent_osd_data(req, i); osd_data = osd_req_op_extent_osd_data(req, i);
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
len += osd_data->length;
num_pages = calc_pages_for((u64)osd_data->alignment, num_pages = calc_pages_for((u64)osd_data->alignment,
(u64)osd_data->length); (u64)osd_data->length);
total_pages += num_pages; total_pages += num_pages;
...@@ -696,6 +687,9 @@ static void writepages_finish(struct ceph_osd_request *req) ...@@ -696,6 +687,9 @@ static void writepages_finish(struct ceph_osd_request *req)
release_pages(osd_data->pages, num_pages); release_pages(osd_data->pages, num_pages);
} }
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, len, rc);
ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc); ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
osd_data = osd_req_op_extent_osd_data(req, 0); osd_data = osd_req_op_extent_osd_data(req, 0);
...@@ -1711,7 +1705,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ...@@ -1711,7 +1705,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
err = ceph_osdc_wait_request(&fsc->client->osdc, req); err = ceph_osdc_wait_request(&fsc->client->osdc, req);
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, err); req->r_end_latency, len, err);
out_put: out_put:
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
......
...@@ -645,9 +645,7 @@ void ceph_add_cap(struct inode *inode, ...@@ -645,9 +645,7 @@ void ceph_add_cap(struct inode *inode,
dout("add_cap %p mds%d cap %llx %s seq %d\n", inode, dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
session->s_mds, cap_id, ceph_cap_string(issued), seq); session->s_mds, cap_id, ceph_cap_string(issued), seq);
spin_lock(&session->s_gen_ttl_lock); gen = atomic_read(&session->s_cap_gen);
gen = session->s_cap_gen;
spin_unlock(&session->s_gen_ttl_lock);
cap = __get_cap_for_mds(ci, mds); cap = __get_cap_for_mds(ci, mds);
if (!cap) { if (!cap) {
...@@ -785,10 +783,8 @@ static int __cap_is_valid(struct ceph_cap *cap) ...@@ -785,10 +783,8 @@ static int __cap_is_valid(struct ceph_cap *cap)
unsigned long ttl; unsigned long ttl;
u32 gen; u32 gen;
spin_lock(&cap->session->s_gen_ttl_lock); gen = atomic_read(&cap->session->s_cap_gen);
gen = cap->session->s_cap_gen;
ttl = cap->session->s_cap_ttl; ttl = cap->session->s_cap_ttl;
spin_unlock(&cap->session->s_gen_ttl_lock);
if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) { if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
dout("__cap_is_valid %p cap %p issued %s " dout("__cap_is_valid %p cap %p issued %s "
...@@ -1182,7 +1178,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) ...@@ -1182,7 +1178,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
* s_cap_gen while session is in the reconnect state. * s_cap_gen while session is in the reconnect state.
*/ */
if (queue_release && if (queue_release &&
(!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { (!session->s_cap_reconnect ||
cap->cap_gen == atomic_read(&session->s_cap_gen))) {
cap->queue_release = 1; cap->queue_release = 1;
if (removed) { if (removed) {
__ceph_queue_cap_release(session, cap); __ceph_queue_cap_release(session, cap);
...@@ -1534,7 +1531,7 @@ static inline int __send_flush_snap(struct inode *inode, ...@@ -1534,7 +1531,7 @@ static inline int __send_flush_snap(struct inode *inode,
* asynchronously back to the MDS once sync writes complete and dirty * asynchronously back to the MDS once sync writes complete and dirty
* data is written out. * data is written out.
* *
* Called under i_ceph_lock. Takes s_mutex as needed. * Called under i_ceph_lock.
*/ */
static void __ceph_flush_snaps(struct ceph_inode_info *ci, static void __ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session *session) struct ceph_mds_session *session)
...@@ -1656,7 +1653,6 @@ void ceph_flush_snaps(struct ceph_inode_info *ci, ...@@ -1656,7 +1653,6 @@ void ceph_flush_snaps(struct ceph_inode_info *ci,
mds = ci->i_auth_cap->session->s_mds; mds = ci->i_auth_cap->session->s_mds;
if (session && session->s_mds != mds) { if (session && session->s_mds != mds) {
dout(" oops, wrong session %p mutex\n", session); dout(" oops, wrong session %p mutex\n", session);
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session); ceph_put_mds_session(session);
session = NULL; session = NULL;
} }
...@@ -1665,10 +1661,6 @@ void ceph_flush_snaps(struct ceph_inode_info *ci, ...@@ -1665,10 +1661,6 @@ void ceph_flush_snaps(struct ceph_inode_info *ci,
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
session = __ceph_lookup_mds_session(mdsc, mds); session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
if (session) {
dout(" inverting session/ino locks on %p\n", session);
mutex_lock(&session->s_mutex);
}
goto retry; goto retry;
} }
...@@ -1680,12 +1672,10 @@ void ceph_flush_snaps(struct ceph_inode_info *ci, ...@@ -1680,12 +1672,10 @@ void ceph_flush_snaps(struct ceph_inode_info *ci,
out: out:
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (psession) { if (psession)
*psession = session; *psession = session;
} else if (session) { else
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session); ceph_put_mds_session(session);
}
/* we flushed them all; remove this inode from the queue */ /* we flushed them all; remove this inode from the queue */
spin_lock(&mdsc->snap_flush_lock); spin_lock(&mdsc->snap_flush_lock);
list_del_init(&ci->i_snap_flush_item); list_del_init(&ci->i_snap_flush_item);
...@@ -1915,7 +1905,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1915,7 +1905,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_cap *cap; struct ceph_cap *cap;
u64 flush_tid, oldest_flush_tid; u64 flush_tid, oldest_flush_tid;
int file_wanted, used, cap_used; int file_wanted, used, cap_used;
int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
int issued, implemented, want, retain, revoking, flushing = 0; int issued, implemented, want, retain, revoking, flushing = 0;
int mds = -1; /* keep track of how far we've gone through i_caps list int mds = -1; /* keep track of how far we've gone through i_caps list
to avoid an infinite loop on retry */ to avoid an infinite loop on retry */
...@@ -1923,14 +1912,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1923,14 +1912,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
bool queue_invalidate = false; bool queue_invalidate = false;
bool tried_invalidate = false; bool tried_invalidate = false;
if (session)
ceph_get_mds_session(session);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (ci->i_ceph_flags & CEPH_I_FLUSH) if (ci->i_ceph_flags & CEPH_I_FLUSH)
flags |= CHECK_CAPS_FLUSH; flags |= CHECK_CAPS_FLUSH;
goto retry_locked;
retry: retry:
spin_lock(&ci->i_ceph_lock);
retry_locked:
/* Caps wanted by virtue of active open files. */ /* Caps wanted by virtue of active open files. */
file_wanted = __ceph_caps_file_wanted(ci); file_wanted = __ceph_caps_file_wanted(ci);
...@@ -2010,7 +1998,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -2010,7 +1998,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
ci->i_rdcache_revoking = ci->i_rdcache_gen; ci->i_rdcache_revoking = ci->i_rdcache_gen;
} }
tried_invalidate = true; tried_invalidate = true;
goto retry_locked; goto retry;
} }
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
...@@ -2024,8 +2012,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -2024,8 +2012,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap)) ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap))
continue; continue;
/* NOTE: no side-effects allowed, until we take s_mutex */
/* /*
* If we have an auth cap, we don't need to consider any * If we have an auth cap, we don't need to consider any
* overlapping caps as used. * overlapping caps as used.
...@@ -2088,37 +2074,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -2088,37 +2074,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
continue; /* nope, all good */ continue; /* nope, all good */
ack: ack:
if (session && session != cap->session) { ceph_put_mds_session(session);
dout("oops, wrong session %p mutex\n", session); session = ceph_get_mds_session(cap->session);
mutex_unlock(&session->s_mutex);
session = NULL;
}
if (!session) {
session = cap->session;
if (mutex_trylock(&session->s_mutex) == 0) {
dout("inverting session/ino locks on %p\n",
session);
session = ceph_get_mds_session(session);
spin_unlock(&ci->i_ceph_lock);
if (took_snap_rwsem) {
up_read(&mdsc->snap_rwsem);
took_snap_rwsem = 0;
}
if (session) {
mutex_lock(&session->s_mutex);
ceph_put_mds_session(session);
} else {
/*
* Because we take the reference while
* holding the i_ceph_lock, it should
* never be NULL. Throw a warning if it
* ever is.
*/
WARN_ON_ONCE(true);
}
goto retry;
}
}
/* kick flushing and flush snaps before sending normal /* kick flushing and flush snaps before sending normal
* cap message */ * cap message */
...@@ -2130,20 +2087,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -2130,20 +2087,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
__ceph_flush_snaps(ci, session); __ceph_flush_snaps(ci, session);
goto retry_locked; goto retry;
}
/* take snap_rwsem after session mutex */
if (!took_snap_rwsem) {
if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
dout("inverting snap/in locks on %p\n",
inode);
spin_unlock(&ci->i_ceph_lock);
down_read(&mdsc->snap_rwsem);
took_snap_rwsem = 1;
goto retry;
}
took_snap_rwsem = 1;
} }
if (cap == ci->i_auth_cap && ci->i_dirty_caps) { if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
...@@ -2165,9 +2109,10 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -2165,9 +2109,10 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
__prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used, __prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
want, retain, flushing, flush_tid, oldest_flush_tid); want, retain, flushing, flush_tid, oldest_flush_tid);
spin_unlock(&ci->i_ceph_lock);
spin_unlock(&ci->i_ceph_lock);
__send_cap(&arg, ci); __send_cap(&arg, ci);
spin_lock(&ci->i_ceph_lock);
goto retry; /* retake i_ceph_lock and restart our cap scan. */ goto retry; /* retake i_ceph_lock and restart our cap scan. */
} }
...@@ -2182,13 +2127,9 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -2182,13 +2127,9 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ceph_put_mds_session(session);
if (queue_invalidate) if (queue_invalidate)
ceph_queue_invalidate(inode); ceph_queue_invalidate(inode);
if (session)
mutex_unlock(&session->s_mutex);
if (took_snap_rwsem)
up_read(&mdsc->snap_rwsem);
} }
/* /*
...@@ -2198,26 +2139,17 @@ static int try_flush_caps(struct inode *inode, u64 *ptid) ...@@ -2198,26 +2139,17 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
{ {
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_session *session = NULL;
int flushing = 0; int flushing = 0;
u64 flush_tid = 0, oldest_flush_tid = 0; u64 flush_tid = 0, oldest_flush_tid = 0;
retry:
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
retry_locked: retry_locked:
if (ci->i_dirty_caps && ci->i_auth_cap) { if (ci->i_dirty_caps && ci->i_auth_cap) {
struct ceph_cap *cap = ci->i_auth_cap; struct ceph_cap *cap = ci->i_auth_cap;
struct cap_msg_args arg; struct cap_msg_args arg;
struct ceph_mds_session *session = cap->session;
if (session != cap->session) { if (session->s_state < CEPH_MDS_SESSION_OPEN) {
spin_unlock(&ci->i_ceph_lock);
if (session)
mutex_unlock(&session->s_mutex);
session = cap->session;
mutex_lock(&session->s_mutex);
goto retry;
}
if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) {
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
goto out; goto out;
} }
...@@ -2254,9 +2186,6 @@ static int try_flush_caps(struct inode *inode, u64 *ptid) ...@@ -2254,9 +2186,6 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
} }
out: out:
if (session)
mutex_unlock(&session->s_mutex);
*ptid = flush_tid; *ptid = flush_tid;
return flushing; return flushing;
} }
...@@ -3213,8 +3142,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, ...@@ -3213,8 +3142,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
if (complete_capsnap) if (complete_capsnap)
wake_up_all(&ci->i_cap_wq); wake_up_all(&ci->i_cap_wq);
while (put-- > 0) { while (put-- > 0) {
/* avoid calling iput_final() in osd dispatch threads */ iput(inode);
ceph_async_iput(inode);
} }
} }
...@@ -3288,7 +3216,7 @@ static void handle_cap_grant(struct inode *inode, ...@@ -3288,7 +3216,7 @@ static void handle_cap_grant(struct inode *inode,
u64 size = le64_to_cpu(grant->size); u64 size = le64_to_cpu(grant->size);
u64 max_size = le64_to_cpu(grant->max_size); u64 max_size = le64_to_cpu(grant->max_size);
unsigned char check_caps = 0; unsigned char check_caps = 0;
bool was_stale = cap->cap_gen < session->s_cap_gen; bool was_stale = cap->cap_gen < atomic_read(&session->s_cap_gen);
bool wake = false; bool wake = false;
bool writeback = false; bool writeback = false;
bool queue_trunc = false; bool queue_trunc = false;
...@@ -3340,7 +3268,7 @@ static void handle_cap_grant(struct inode *inode, ...@@ -3340,7 +3268,7 @@ static void handle_cap_grant(struct inode *inode,
} }
/* side effects now are allowed */ /* side effects now are allowed */
cap->cap_gen = session->s_cap_gen; cap->cap_gen = atomic_read(&session->s_cap_gen);
cap->seq = seq; cap->seq = seq;
__check_cap_issue(ci, cap, newcaps); __check_cap_issue(ci, cap, newcaps);
...@@ -3553,13 +3481,12 @@ static void handle_cap_grant(struct inode *inode, ...@@ -3553,13 +3481,12 @@ static void handle_cap_grant(struct inode *inode,
if (wake) if (wake)
wake_up_all(&ci->i_cap_wq); wake_up_all(&ci->i_cap_wq);
mutex_unlock(&session->s_mutex);
if (check_caps == 1) if (check_caps == 1)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL, ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL,
session); session);
else if (check_caps == 2) else if (check_caps == 2)
ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session); ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session);
else
mutex_unlock(&session->s_mutex);
} }
/* /*
...@@ -4203,8 +4130,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -4203,8 +4130,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
done_unlocked: done_unlocked:
ceph_put_string(extra_info.pool_ns); ceph_put_string(extra_info.pool_ns);
/* avoid calling iput_final() in mds dispatch threads */ iput(inode);
ceph_async_iput(inode);
return; return;
flush_cap_releases: flush_cap_releases:
...@@ -4246,8 +4172,7 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) ...@@ -4246,8 +4172,7 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
spin_unlock(&mdsc->cap_delay_lock); spin_unlock(&mdsc->cap_delay_lock);
dout("check_delayed_caps on %p\n", inode); dout("check_delayed_caps on %p\n", inode);
ceph_check_caps(ci, 0, NULL); ceph_check_caps(ci, 0, NULL);
/* avoid calling iput_final() in tick thread */ iput(inode);
ceph_async_iput(inode);
spin_lock(&mdsc->cap_delay_lock); spin_lock(&mdsc->cap_delay_lock);
} }
} }
......
...@@ -127,7 +127,7 @@ static int mdsc_show(struct seq_file *s, void *p) ...@@ -127,7 +127,7 @@ static int mdsc_show(struct seq_file *s, void *p)
return 0; return 0;
} }
#define CEPH_METRIC_SHOW(name, total, avg, min, max, sq) { \ #define CEPH_LAT_METRIC_SHOW(name, total, avg, min, max, sq) { \
s64 _total, _avg, _min, _max, _sq, _st; \ s64 _total, _avg, _min, _max, _sq, _st; \
_avg = ktime_to_us(avg); \ _avg = ktime_to_us(avg); \
_min = ktime_to_us(min == KTIME_MAX ? 0 : min); \ _min = ktime_to_us(min == KTIME_MAX ? 0 : min); \
...@@ -140,6 +140,12 @@ static int mdsc_show(struct seq_file *s, void *p) ...@@ -140,6 +140,12 @@ static int mdsc_show(struct seq_file *s, void *p)
name, total, _avg, _min, _max, _st); \ name, total, _avg, _min, _max, _st); \
} }
#define CEPH_SZ_METRIC_SHOW(name, total, avg, min, max, sum) { \
u64 _min = min == U64_MAX ? 0 : min; \
seq_printf(s, "%-14s%-12lld%-16llu%-16llu%-16llu%llu\n", \
name, total, avg, _min, max, sum); \
}
static int metric_show(struct seq_file *s, void *p) static int metric_show(struct seq_file *s, void *p)
{ {
struct ceph_fs_client *fsc = s->private; struct ceph_fs_client *fsc = s->private;
...@@ -147,6 +153,7 @@ static int metric_show(struct seq_file *s, void *p) ...@@ -147,6 +153,7 @@ static int metric_show(struct seq_file *s, void *p)
struct ceph_client_metric *m = &mdsc->metric; struct ceph_client_metric *m = &mdsc->metric;
int nr_caps = 0; int nr_caps = 0;
s64 total, sum, avg, min, max, sq; s64 total, sum, avg, min, max, sq;
u64 sum_sz, avg_sz, min_sz, max_sz;
sum = percpu_counter_sum(&m->total_inodes); sum = percpu_counter_sum(&m->total_inodes);
seq_printf(s, "item total\n"); seq_printf(s, "item total\n");
...@@ -170,7 +177,7 @@ static int metric_show(struct seq_file *s, void *p) ...@@ -170,7 +177,7 @@ static int metric_show(struct seq_file *s, void *p)
max = m->read_latency_max; max = m->read_latency_max;
sq = m->read_latency_sq_sum; sq = m->read_latency_sq_sum;
spin_unlock(&m->read_metric_lock); spin_unlock(&m->read_metric_lock);
CEPH_METRIC_SHOW("read", total, avg, min, max, sq); CEPH_LAT_METRIC_SHOW("read", total, avg, min, max, sq);
spin_lock(&m->write_metric_lock); spin_lock(&m->write_metric_lock);
total = m->total_writes; total = m->total_writes;
...@@ -180,7 +187,7 @@ static int metric_show(struct seq_file *s, void *p) ...@@ -180,7 +187,7 @@ static int metric_show(struct seq_file *s, void *p)
max = m->write_latency_max; max = m->write_latency_max;
sq = m->write_latency_sq_sum; sq = m->write_latency_sq_sum;
spin_unlock(&m->write_metric_lock); spin_unlock(&m->write_metric_lock);
CEPH_METRIC_SHOW("write", total, avg, min, max, sq); CEPH_LAT_METRIC_SHOW("write", total, avg, min, max, sq);
spin_lock(&m->metadata_metric_lock); spin_lock(&m->metadata_metric_lock);
total = m->total_metadatas; total = m->total_metadatas;
...@@ -190,7 +197,29 @@ static int metric_show(struct seq_file *s, void *p) ...@@ -190,7 +197,29 @@ static int metric_show(struct seq_file *s, void *p)
max = m->metadata_latency_max; max = m->metadata_latency_max;
sq = m->metadata_latency_sq_sum; sq = m->metadata_latency_sq_sum;
spin_unlock(&m->metadata_metric_lock); spin_unlock(&m->metadata_metric_lock);
CEPH_METRIC_SHOW("metadata", total, avg, min, max, sq); CEPH_LAT_METRIC_SHOW("metadata", total, avg, min, max, sq);
seq_printf(s, "\n");
seq_printf(s, "item total avg_sz(bytes) min_sz(bytes) max_sz(bytes) total_sz(bytes)\n");
seq_printf(s, "----------------------------------------------------------------------------------------\n");
spin_lock(&m->read_metric_lock);
total = m->total_reads;
sum_sz = m->read_size_sum;
avg_sz = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum_sz, total) : 0;
min_sz = m->read_size_min;
max_sz = m->read_size_max;
spin_unlock(&m->read_metric_lock);
CEPH_SZ_METRIC_SHOW("read", total, avg_sz, min_sz, max_sz, sum_sz);
spin_lock(&m->write_metric_lock);
total = m->total_writes;
sum_sz = m->write_size_sum;
avg_sz = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum_sz, total) : 0;
min_sz = m->write_size_min;
max_sz = m->write_size_max;
spin_unlock(&m->write_metric_lock);
CEPH_SZ_METRIC_SHOW("write", total, avg_sz, min_sz, max_sz, sum_sz);
seq_printf(s, "\n"); seq_printf(s, "\n");
seq_printf(s, "item total miss hit\n"); seq_printf(s, "item total miss hit\n");
......
...@@ -788,6 +788,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, ...@@ -788,6 +788,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
mask |= CEPH_CAP_XATTR_SHARED; mask |= CEPH_CAP_XATTR_SHARED;
req->r_args.getattr.mask = cpu_to_le32(mask); req->r_args.getattr.mask = cpu_to_le32(mask);
ihold(dir);
req->r_parent = dir; req->r_parent = dir;
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
err = ceph_mdsc_do_request(mdsc, NULL, req); err = ceph_mdsc_do_request(mdsc, NULL, req);
...@@ -868,6 +869,7 @@ static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir, ...@@ -868,6 +869,7 @@ static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir,
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
req->r_parent = dir; req->r_parent = dir;
ihold(dir);
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_args.mknod.mode = cpu_to_le32(mode); req->r_args.mknod.mode = cpu_to_le32(mode);
req->r_args.mknod.rdev = cpu_to_le32(rdev); req->r_args.mknod.rdev = cpu_to_le32(rdev);
...@@ -929,6 +931,8 @@ static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir, ...@@ -929,6 +931,8 @@ static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir,
goto out; goto out;
} }
req->r_parent = dir; req->r_parent = dir;
ihold(dir);
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
...@@ -993,6 +997,7 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir, ...@@ -993,6 +997,7 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir,
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
req->r_parent = dir; req->r_parent = dir;
ihold(dir);
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_args.mkdir.mode = cpu_to_le32(mode); req->r_args.mkdir.mode = cpu_to_le32(mode);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
...@@ -1037,6 +1042,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, ...@@ -1037,6 +1042,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
req->r_num_caps = 2; req->r_num_caps = 2;
req->r_old_dentry = dget(old_dentry); req->r_old_dentry = dget(old_dentry);
req->r_parent = dir; req->r_parent = dir;
ihold(dir);
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
...@@ -1158,6 +1164,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) ...@@ -1158,6 +1164,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
req->r_parent = dir; req->r_parent = dir;
ihold(dir);
req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
req->r_inode_drop = ceph_drop_caps_for_unlink(inode); req->r_inode_drop = ceph_drop_caps_for_unlink(inode);
...@@ -1232,6 +1239,7 @@ static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir, ...@@ -1232,6 +1239,7 @@ static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir,
req->r_old_dentry = dget(old_dentry); req->r_old_dentry = dget(old_dentry);
req->r_old_dentry_dir = old_dir; req->r_old_dentry_dir = old_dir;
req->r_parent = new_dir; req->r_parent = new_dir;
ihold(new_dir);
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
...@@ -1548,10 +1556,8 @@ static bool __dentry_lease_is_valid(struct ceph_dentry_info *di) ...@@ -1548,10 +1556,8 @@ static bool __dentry_lease_is_valid(struct ceph_dentry_info *di)
u32 gen; u32 gen;
unsigned long ttl; unsigned long ttl;
spin_lock(&session->s_gen_ttl_lock); gen = atomic_read(&session->s_cap_gen);
gen = session->s_cap_gen;
ttl = session->s_cap_ttl; ttl = session->s_cap_ttl;
spin_unlock(&session->s_gen_ttl_lock);
if (di->lease_gen == gen && if (di->lease_gen == gen &&
time_before(jiffies, ttl) && time_before(jiffies, ttl) &&
...@@ -1730,6 +1736,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) ...@@ -1730,6 +1736,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
req->r_dentry = dget(dentry); req->r_dentry = dget(dentry);
req->r_num_caps = 2; req->r_num_caps = 2;
req->r_parent = dir; req->r_parent = dir;
ihold(dir);
mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
if (ceph_security_xattr_wanted(dir)) if (ceph_security_xattr_wanted(dir))
...@@ -1809,8 +1816,7 @@ static void ceph_d_release(struct dentry *dentry) ...@@ -1809,8 +1816,7 @@ static void ceph_d_release(struct dentry *dentry)
dentry->d_fsdata = NULL; dentry->d_fsdata = NULL;
spin_unlock(&dentry->d_lock); spin_unlock(&dentry->d_lock);
if (di->lease_session) ceph_put_mds_session(di->lease_session);
ceph_put_mds_session(di->lease_session);
kmem_cache_free(ceph_dentry_cachep, di); kmem_cache_free(ceph_dentry_cachep, di);
} }
......
...@@ -542,6 +542,7 @@ static int ceph_get_name(struct dentry *parent, char *name, ...@@ -542,6 +542,7 @@ static int ceph_get_name(struct dentry *parent, char *name,
ihold(inode); ihold(inode);
req->r_ino2 = ceph_vino(d_inode(parent)); req->r_ino2 = ceph_vino(d_inode(parent));
req->r_parent = d_inode(parent); req->r_parent = d_inode(parent);
ihold(req->r_parent);
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
req->r_num_caps = 2; req->r_num_caps = 2;
err = ceph_mdsc_do_request(mdsc, NULL, req); err = ceph_mdsc_do_request(mdsc, NULL, req);
......
...@@ -706,6 +706,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, ...@@ -706,6 +706,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
mask |= CEPH_CAP_XATTR_SHARED; mask |= CEPH_CAP_XATTR_SHARED;
req->r_args.open.mask = cpu_to_le32(mask); req->r_args.open.mask = cpu_to_le32(mask);
req->r_parent = dir; req->r_parent = dir;
ihold(dir);
if (flags & O_CREAT) { if (flags & O_CREAT) {
struct ceph_file_layout lo; struct ceph_file_layout lo;
...@@ -903,7 +904,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, ...@@ -903,7 +904,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
ceph_update_read_metrics(&fsc->mdsc->metric, ceph_update_read_metrics(&fsc->mdsc->metric,
req->r_start_latency, req->r_start_latency,
req->r_end_latency, req->r_end_latency,
ret); len, ret);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
...@@ -1035,12 +1036,12 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) ...@@ -1035,12 +1036,12 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
struct ceph_aio_request *aio_req = req->r_priv; struct ceph_aio_request *aio_req = req->r_priv;
struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric; struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
unsigned int len = osd_data->bvec_pos.iter.bi_size;
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
BUG_ON(!osd_data->num_bvecs); BUG_ON(!osd_data->num_bvecs);
dout("ceph_aio_complete_req %p rc %d bytes %u\n", dout("ceph_aio_complete_req %p rc %d bytes %u\n", inode, rc, len);
inode, rc, osd_data->bvec_pos.iter.bi_size);
if (rc == -EOLDSNAPC) { if (rc == -EOLDSNAPC) {
struct ceph_aio_work *aio_work; struct ceph_aio_work *aio_work;
...@@ -1058,9 +1059,9 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) ...@@ -1058,9 +1059,9 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
} else if (!aio_req->write) { } else if (!aio_req->write) {
if (rc == -ENOENT) if (rc == -ENOENT)
rc = 0; rc = 0;
if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) { if (rc >= 0 && len > rc) {
struct iov_iter i; struct iov_iter i;
int zlen = osd_data->bvec_pos.iter.bi_size - rc; int zlen = len - rc;
/* /*
* If read is satisfied by single OSD request, * If read is satisfied by single OSD request,
...@@ -1077,8 +1078,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) ...@@ -1077,8 +1078,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
} }
iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs, iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs,
osd_data->num_bvecs, osd_data->num_bvecs, len);
osd_data->bvec_pos.iter.bi_size);
iov_iter_advance(&i, rc); iov_iter_advance(&i, rc);
iov_iter_zero(zlen, &i); iov_iter_zero(zlen, &i);
} }
...@@ -1088,10 +1088,10 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) ...@@ -1088,10 +1088,10 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
if (req->r_start_latency) { if (req->r_start_latency) {
if (aio_req->write) if (aio_req->write)
ceph_update_write_metrics(metric, req->r_start_latency, ceph_update_write_metrics(metric, req->r_start_latency,
req->r_end_latency, rc); req->r_end_latency, len, rc);
else else
ceph_update_read_metrics(metric, req->r_start_latency, ceph_update_read_metrics(metric, req->r_start_latency,
req->r_end_latency, rc); req->r_end_latency, len, rc);
} }
put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
...@@ -1299,10 +1299,10 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, ...@@ -1299,10 +1299,10 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
if (write) if (write)
ceph_update_write_metrics(metric, req->r_start_latency, ceph_update_write_metrics(metric, req->r_start_latency,
req->r_end_latency, ret); req->r_end_latency, len, ret);
else else
ceph_update_read_metrics(metric, req->r_start_latency, ceph_update_read_metrics(metric, req->r_start_latency,
req->r_end_latency, ret); req->r_end_latency, len, ret);
size = i_size_read(inode); size = i_size_read(inode);
if (!write) { if (!write) {
...@@ -1476,7 +1476,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ...@@ -1476,7 +1476,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
ret = ceph_osdc_wait_request(&fsc->client->osdc, req); ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, ret); req->r_end_latency, len, ret);
out: out:
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (ret != 0) { if (ret != 0) {
......
...@@ -1124,7 +1124,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry, ...@@ -1124,7 +1124,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
return; return;
} }
if (di->lease_gen == session->s_cap_gen && if (di->lease_gen == atomic_read(&session->s_cap_gen) &&
time_before(ttl, di->time)) time_before(ttl, di->time))
return; /* we already have a newer lease. */ return; /* we already have a newer lease. */
...@@ -1135,7 +1135,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry, ...@@ -1135,7 +1135,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
if (!di->lease_session) if (!di->lease_session)
di->lease_session = ceph_get_mds_session(session); di->lease_session = ceph_get_mds_session(session);
di->lease_gen = session->s_cap_gen; di->lease_gen = atomic_read(&session->s_cap_gen);
di->lease_seq = le32_to_cpu(lease->seq); di->lease_seq = le32_to_cpu(lease->seq);
di->lease_renew_after = half_ttl; di->lease_renew_after = half_ttl;
di->lease_renew_from = 0; di->lease_renew_from = 0;
...@@ -1154,8 +1154,7 @@ static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry, ...@@ -1154,8 +1154,7 @@ static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry,
__update_dentry_lease(dir, dentry, lease, session, from_time, __update_dentry_lease(dir, dentry, lease, session, from_time,
&old_lease_session); &old_lease_session);
spin_unlock(&dentry->d_lock); spin_unlock(&dentry->d_lock);
if (old_lease_session) ceph_put_mds_session(old_lease_session);
ceph_put_mds_session(old_lease_session);
} }
/* /*
...@@ -1200,8 +1199,7 @@ static void update_dentry_lease_careful(struct dentry *dentry, ...@@ -1200,8 +1199,7 @@ static void update_dentry_lease_careful(struct dentry *dentry,
from_time, &old_lease_session); from_time, &old_lease_session);
out_unlock: out_unlock:
spin_unlock(&dentry->d_lock); spin_unlock(&dentry->d_lock);
if (old_lease_session) ceph_put_mds_session(old_lease_session);
ceph_put_mds_session(old_lease_session);
} }
/* /*
...@@ -1568,8 +1566,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, ...@@ -1568,8 +1566,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
unlock_new_inode(in); unlock_new_inode(in);
} }
/* avoid calling iput_final() in mds dispatch threads */ iput(in);
ceph_async_iput(in);
} }
return err; return err;
...@@ -1766,13 +1763,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1766,13 +1763,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
if (ret < 0) { if (ret < 0) {
pr_err("ceph_fill_inode badness on %p\n", in); pr_err("ceph_fill_inode badness on %p\n", in);
if (d_really_is_negative(dn)) { if (d_really_is_negative(dn)) {
/* avoid calling iput_final() in mds
* dispatch threads */
if (in->i_state & I_NEW) { if (in->i_state & I_NEW) {
ihold(in); ihold(in);
discard_new_inode(in); discard_new_inode(in);
} }
ceph_async_iput(in); iput(in);
} }
d_drop(dn); d_drop(dn);
err = ret; err = ret;
...@@ -1785,7 +1780,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1785,7 +1780,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
if (ceph_security_xattr_deadlock(in)) { if (ceph_security_xattr_deadlock(in)) {
dout(" skip splicing dn %p to inode %p" dout(" skip splicing dn %p to inode %p"
" (security xattr deadlock)\n", dn, in); " (security xattr deadlock)\n", dn, in);
ceph_async_iput(in); iput(in);
skipped++; skipped++;
goto next_item; goto next_item;
} }
...@@ -1836,25 +1831,6 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size) ...@@ -1836,25 +1831,6 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size)
return ret; return ret;
} }
/*
* Put reference to inode, but avoid calling iput_final() in current thread.
* iput_final() may wait for reahahead pages. The wait can cause deadlock in
* some contexts.
*/
void ceph_async_iput(struct inode *inode)
{
if (!inode)
return;
for (;;) {
if (atomic_add_unless(&inode->i_count, -1, 1))
break;
if (queue_work(ceph_inode_to_client(inode)->inode_wq,
&ceph_inode(inode)->i_work))
break;
/* queue work failed, i_count must be at least 2 */
}
}
void ceph_queue_inode_work(struct inode *inode, int work_bit) void ceph_queue_inode_work(struct inode *inode, int work_bit)
{ {
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
......
...@@ -664,6 +664,9 @@ struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) ...@@ -664,6 +664,9 @@ struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
void ceph_put_mds_session(struct ceph_mds_session *s) void ceph_put_mds_session(struct ceph_mds_session *s)
{ {
if (IS_ERR_OR_NULL(s))
return;
dout("mdsc put_session %p %d -> %d\n", s, dout("mdsc put_session %p %d -> %d\n", s,
refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
if (refcount_dec_and_test(&s->s_ref)) { if (refcount_dec_and_test(&s->s_ref)) {
...@@ -746,8 +749,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, ...@@ -746,8 +749,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
spin_lock_init(&s->s_gen_ttl_lock); atomic_set(&s->s_cap_gen, 1);
s->s_cap_gen = 1;
s->s_cap_ttl = jiffies - 1; s->s_cap_ttl = jiffies - 1;
spin_lock_init(&s->s_cap_lock); spin_lock_init(&s->s_cap_lock);
...@@ -822,14 +824,13 @@ void ceph_mdsc_release_request(struct kref *kref) ...@@ -822,14 +824,13 @@ void ceph_mdsc_release_request(struct kref *kref)
ceph_msg_put(req->r_reply); ceph_msg_put(req->r_reply);
if (req->r_inode) { if (req->r_inode) {
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
/* avoid calling iput_final() in mds dispatch threads */ iput(req->r_inode);
ceph_async_iput(req->r_inode);
} }
if (req->r_parent) { if (req->r_parent) {
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
ceph_async_iput(req->r_parent); iput(req->r_parent);
} }
ceph_async_iput(req->r_target_inode); iput(req->r_target_inode);
if (req->r_dentry) if (req->r_dentry)
dput(req->r_dentry); dput(req->r_dentry);
if (req->r_old_dentry) if (req->r_old_dentry)
...@@ -843,7 +844,7 @@ void ceph_mdsc_release_request(struct kref *kref) ...@@ -843,7 +844,7 @@ void ceph_mdsc_release_request(struct kref *kref)
*/ */
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN); CEPH_CAP_PIN);
ceph_async_iput(req->r_old_dentry_dir); iput(req->r_old_dentry_dir);
} }
kfree(req->r_path1); kfree(req->r_path1);
kfree(req->r_path2); kfree(req->r_path2);
...@@ -958,8 +959,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc, ...@@ -958,8 +959,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
} }
if (req->r_unsafe_dir) { if (req->r_unsafe_dir) {
/* avoid calling iput_final() in mds dispatch threads */ iput(req->r_unsafe_dir);
ceph_async_iput(req->r_unsafe_dir);
req->r_unsafe_dir = NULL; req->r_unsafe_dir = NULL;
} }
...@@ -1130,7 +1130,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, ...@@ -1130,7 +1130,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
if (!cap) { if (!cap) {
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ceph_async_iput(inode); iput(inode);
goto random; goto random;
} }
mds = cap->session->s_mds; mds = cap->session->s_mds;
...@@ -1139,9 +1139,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, ...@@ -1139,9 +1139,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
cap == ci->i_auth_cap ? "auth " : "", cap); cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
out: out:
/* avoid calling iput_final() while holding mdsc->mutex or iput(inode);
* in mds dispatch threads */
ceph_async_iput(inode);
return mds; return mds;
random: random:
...@@ -1438,8 +1436,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc, ...@@ -1438,8 +1436,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
for (i = 0; i < mi->num_export_targets; i++) { for (i = 0; i < mi->num_export_targets; i++) {
ts = __open_export_target_session(mdsc, mi->export_targets[i]); ts = __open_export_target_session(mdsc, mi->export_targets[i]);
if (!IS_ERR(ts)) ceph_put_mds_session(ts);
ceph_put_mds_session(ts);
} }
} }
...@@ -1545,9 +1542,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, ...@@ -1545,9 +1542,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
if (last_inode) { if (last_inode) {
/* avoid calling iput_final() while holding iput(last_inode);
* s_mutex or in mds dispatch threads */
ceph_async_iput(last_inode);
last_inode = NULL; last_inode = NULL;
} }
if (old_cap) { if (old_cap) {
...@@ -1581,7 +1576,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, ...@@ -1581,7 +1576,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
session->s_cap_iterator = NULL; session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
ceph_async_iput(last_inode); iput(last_inode);
if (old_cap) if (old_cap)
ceph_put_cap(session->s_mdsc, old_cap); ceph_put_cap(session->s_mdsc, old_cap);
...@@ -1721,8 +1716,7 @@ static void remove_session_caps(struct ceph_mds_session *session) ...@@ -1721,8 +1716,7 @@ static void remove_session_caps(struct ceph_mds_session *session)
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
inode = ceph_find_inode(sb, vino); inode = ceph_find_inode(sb, vino);
/* avoid calling iput_final() while holding s_mutex */ iput(inode);
ceph_async_iput(inode);
spin_lock(&session->s_cap_lock); spin_lock(&session->s_cap_lock);
} }
...@@ -1761,7 +1755,7 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -1761,7 +1755,7 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
ci->i_requested_max_size = 0; ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
} else if (ev == RENEWCAPS) { } else if (ev == RENEWCAPS) {
if (cap->cap_gen < cap->session->s_cap_gen) { if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) {
/* mds did not re-issue stale cap */ /* mds did not re-issue stale cap */
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
cap->issued = cap->implemented = CEPH_CAP_PIN; cap->issued = cap->implemented = CEPH_CAP_PIN;
...@@ -2988,7 +2982,6 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, ...@@ -2988,7 +2982,6 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
__ceph_touch_fmode(ci, mdsc, fmode); __ceph_touch_fmode(ci, mdsc, fmode);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ihold(req->r_parent);
} }
if (req->r_old_dentry_dir) if (req->r_old_dentry_dir)
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
...@@ -3499,10 +3492,8 @@ static void handle_session(struct ceph_mds_session *session, ...@@ -3499,10 +3492,8 @@ static void handle_session(struct ceph_mds_session *session,
case CEPH_SESSION_STALE: case CEPH_SESSION_STALE:
pr_info("mds%d caps went stale, renewing\n", pr_info("mds%d caps went stale, renewing\n",
session->s_mds); session->s_mds);
spin_lock(&session->s_gen_ttl_lock); atomic_inc(&session->s_cap_gen);
session->s_cap_gen++;
session->s_cap_ttl = jiffies - 1; session->s_cap_ttl = jiffies - 1;
spin_unlock(&session->s_gen_ttl_lock);
send_renew_caps(mdsc, session); send_renew_caps(mdsc, session);
break; break;
...@@ -3771,7 +3762,7 @@ static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3771,7 +3762,7 @@ static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
cap->seq = 0; /* reset cap seq */ cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */ cap->issue_seq = 0; /* and issue_seq */
cap->mseq = 0; /* and migrate_seq */ cap->mseq = 0; /* and migrate_seq */
cap->cap_gen = cap->session->s_cap_gen; cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
/* These are lost when the session goes away */ /* These are lost when the session goes away */
if (S_ISDIR(inode->i_mode)) { if (S_ISDIR(inode->i_mode)) {
...@@ -4011,9 +4002,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, ...@@ -4011,9 +4002,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
dout("session %p state %s\n", session, dout("session %p state %s\n", session,
ceph_session_state_name(session->s_state)); ceph_session_state_name(session->s_state));
spin_lock(&session->s_gen_ttl_lock); atomic_inc(&session->s_cap_gen);
session->s_cap_gen++;
spin_unlock(&session->s_gen_ttl_lock);
spin_lock(&session->s_cap_lock); spin_lock(&session->s_cap_lock);
/* don't know if session is readonly */ /* don't know if session is readonly */
...@@ -4344,7 +4333,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, ...@@ -4344,7 +4333,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
case CEPH_MDS_LEASE_RENEW: case CEPH_MDS_LEASE_RENEW:
if (di->lease_session == session && if (di->lease_session == session &&
di->lease_gen == session->s_cap_gen && di->lease_gen == atomic_read(&session->s_cap_gen) &&
di->lease_renew_from && di->lease_renew_from &&
di->lease_renew_after == 0) { di->lease_renew_after == 0) {
unsigned long duration = unsigned long duration =
...@@ -4372,8 +4361,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, ...@@ -4372,8 +4361,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
out: out:
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
/* avoid calling iput_final() in mds dispatch threads */ iput(inode);
ceph_async_iput(inode);
return; return;
bad: bad:
......
...@@ -186,10 +186,8 @@ struct ceph_mds_session { ...@@ -186,10 +186,8 @@ struct ceph_mds_session {
struct ceph_auth_handshake s_auth; struct ceph_auth_handshake s_auth;
/* protected by s_gen_ttl_lock */ atomic_t s_cap_gen; /* inc each time we get mds stale msg */
spinlock_t s_gen_ttl_lock; unsigned long s_cap_ttl; /* when session caps expire. protected by s_mutex */
u32 s_cap_gen; /* inc each time we get mds stale msg */
unsigned long s_cap_ttl; /* when session caps expire */
/* protected by s_cap_lock */ /* protected by s_cap_lock */
spinlock_t s_cap_lock; spinlock_t s_cap_lock;
......
...@@ -20,8 +20,11 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -20,8 +20,11 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
struct ceph_opened_files *files; struct ceph_opened_files *files;
struct ceph_pinned_icaps *icaps; struct ceph_pinned_icaps *icaps;
struct ceph_opened_inodes *inodes; struct ceph_opened_inodes *inodes;
struct ceph_read_io_size *rsize;
struct ceph_write_io_size *wsize;
struct ceph_client_metric *m = &mdsc->metric; struct ceph_client_metric *m = &mdsc->metric;
u64 nr_caps = atomic64_read(&m->total_caps); u64 nr_caps = atomic64_read(&m->total_caps);
u32 header_len = sizeof(struct ceph_metric_header);
struct ceph_msg *msg; struct ceph_msg *msg;
struct timespec64 ts; struct timespec64 ts;
s64 sum; s64 sum;
...@@ -30,7 +33,8 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -30,7 +33,8 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write) len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write)
+ sizeof(*meta) + sizeof(*dlease) + sizeof(*files) + sizeof(*meta) + sizeof(*dlease) + sizeof(*files)
+ sizeof(*icaps) + sizeof(*inodes); + sizeof(*icaps) + sizeof(*inodes) + sizeof(*rsize)
+ sizeof(*wsize);
msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true); msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true);
if (!msg) { if (!msg) {
...@@ -43,10 +47,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -43,10 +47,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
/* encode the cap metric */ /* encode the cap metric */
cap = (struct ceph_metric_cap *)(head + 1); cap = (struct ceph_metric_cap *)(head + 1);
cap->type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO); cap->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO);
cap->ver = 1; cap->header.ver = 1;
cap->compat = 1; cap->header.compat = 1;
cap->data_len = cpu_to_le32(sizeof(*cap) - 10); cap->header.data_len = cpu_to_le32(sizeof(*cap) - header_len);
cap->hit = cpu_to_le64(percpu_counter_sum(&m->i_caps_hit)); cap->hit = cpu_to_le64(percpu_counter_sum(&m->i_caps_hit));
cap->mis = cpu_to_le64(percpu_counter_sum(&m->i_caps_mis)); cap->mis = cpu_to_le64(percpu_counter_sum(&m->i_caps_mis));
cap->total = cpu_to_le64(nr_caps); cap->total = cpu_to_le64(nr_caps);
...@@ -54,10 +58,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -54,10 +58,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
/* encode the read latency metric */ /* encode the read latency metric */
read = (struct ceph_metric_read_latency *)(cap + 1); read = (struct ceph_metric_read_latency *)(cap + 1);
read->type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY); read->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY);
read->ver = 1; read->header.ver = 1;
read->compat = 1; read->header.compat = 1;
read->data_len = cpu_to_le32(sizeof(*read) - 10); read->header.data_len = cpu_to_le32(sizeof(*read) - header_len);
sum = m->read_latency_sum; sum = m->read_latency_sum;
jiffies_to_timespec64(sum, &ts); jiffies_to_timespec64(sum, &ts);
read->sec = cpu_to_le32(ts.tv_sec); read->sec = cpu_to_le32(ts.tv_sec);
...@@ -66,10 +70,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -66,10 +70,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
/* encode the write latency metric */ /* encode the write latency metric */
write = (struct ceph_metric_write_latency *)(read + 1); write = (struct ceph_metric_write_latency *)(read + 1);
write->type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY); write->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY);
write->ver = 1; write->header.ver = 1;
write->compat = 1; write->header.compat = 1;
write->data_len = cpu_to_le32(sizeof(*write) - 10); write->header.data_len = cpu_to_le32(sizeof(*write) - header_len);
sum = m->write_latency_sum; sum = m->write_latency_sum;
jiffies_to_timespec64(sum, &ts); jiffies_to_timespec64(sum, &ts);
write->sec = cpu_to_le32(ts.tv_sec); write->sec = cpu_to_le32(ts.tv_sec);
...@@ -78,10 +82,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -78,10 +82,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
/* encode the metadata latency metric */ /* encode the metadata latency metric */
meta = (struct ceph_metric_metadata_latency *)(write + 1); meta = (struct ceph_metric_metadata_latency *)(write + 1);
meta->type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY); meta->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY);
meta->ver = 1; meta->header.ver = 1;
meta->compat = 1; meta->header.compat = 1;
meta->data_len = cpu_to_le32(sizeof(*meta) - 10); meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len);
sum = m->metadata_latency_sum; sum = m->metadata_latency_sum;
jiffies_to_timespec64(sum, &ts); jiffies_to_timespec64(sum, &ts);
meta->sec = cpu_to_le32(ts.tv_sec); meta->sec = cpu_to_le32(ts.tv_sec);
...@@ -90,10 +94,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -90,10 +94,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
/* encode the dentry lease metric */ /* encode the dentry lease metric */
dlease = (struct ceph_metric_dlease *)(meta + 1); dlease = (struct ceph_metric_dlease *)(meta + 1);
dlease->type = cpu_to_le32(CLIENT_METRIC_TYPE_DENTRY_LEASE); dlease->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_DENTRY_LEASE);
dlease->ver = 1; dlease->header.ver = 1;
dlease->compat = 1; dlease->header.compat = 1;
dlease->data_len = cpu_to_le32(sizeof(*dlease) - 10); dlease->header.data_len = cpu_to_le32(sizeof(*dlease) - header_len);
dlease->hit = cpu_to_le64(percpu_counter_sum(&m->d_lease_hit)); dlease->hit = cpu_to_le64(percpu_counter_sum(&m->d_lease_hit));
dlease->mis = cpu_to_le64(percpu_counter_sum(&m->d_lease_mis)); dlease->mis = cpu_to_le64(percpu_counter_sum(&m->d_lease_mis));
dlease->total = cpu_to_le64(atomic64_read(&m->total_dentries)); dlease->total = cpu_to_le64(atomic64_read(&m->total_dentries));
...@@ -103,34 +107,54 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc, ...@@ -103,34 +107,54 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
/* encode the opened files metric */ /* encode the opened files metric */
files = (struct ceph_opened_files *)(dlease + 1); files = (struct ceph_opened_files *)(dlease + 1);
files->type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_FILES); files->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_FILES);
files->ver = 1; files->header.ver = 1;
files->compat = 1; files->header.compat = 1;
files->data_len = cpu_to_le32(sizeof(*files) - 10); files->header.data_len = cpu_to_le32(sizeof(*files) - header_len);
files->opened_files = cpu_to_le64(atomic64_read(&m->opened_files)); files->opened_files = cpu_to_le64(atomic64_read(&m->opened_files));
files->total = cpu_to_le64(sum); files->total = cpu_to_le64(sum);
items++; items++;
/* encode the pinned icaps metric */ /* encode the pinned icaps metric */
icaps = (struct ceph_pinned_icaps *)(files + 1); icaps = (struct ceph_pinned_icaps *)(files + 1);
icaps->type = cpu_to_le32(CLIENT_METRIC_TYPE_PINNED_ICAPS); icaps->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_PINNED_ICAPS);
icaps->ver = 1; icaps->header.ver = 1;
icaps->compat = 1; icaps->header.compat = 1;
icaps->data_len = cpu_to_le32(sizeof(*icaps) - 10); icaps->header.data_len = cpu_to_le32(sizeof(*icaps) - header_len);
icaps->pinned_icaps = cpu_to_le64(nr_caps); icaps->pinned_icaps = cpu_to_le64(nr_caps);
icaps->total = cpu_to_le64(sum); icaps->total = cpu_to_le64(sum);
items++; items++;
/* encode the opened inodes metric */ /* encode the opened inodes metric */
inodes = (struct ceph_opened_inodes *)(icaps + 1); inodes = (struct ceph_opened_inodes *)(icaps + 1);
inodes->type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_INODES); inodes->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_INODES);
inodes->ver = 1; inodes->header.ver = 1;
inodes->compat = 1; inodes->header.compat = 1;
inodes->data_len = cpu_to_le32(sizeof(*inodes) - 10); inodes->header.data_len = cpu_to_le32(sizeof(*inodes) - header_len);
inodes->opened_inodes = cpu_to_le64(percpu_counter_sum(&m->opened_inodes)); inodes->opened_inodes = cpu_to_le64(percpu_counter_sum(&m->opened_inodes));
inodes->total = cpu_to_le64(sum); inodes->total = cpu_to_le64(sum);
items++; items++;
/* encode the read io size metric */
rsize = (struct ceph_read_io_size *)(inodes + 1);
rsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_IO_SIZES);
rsize->header.ver = 1;
rsize->header.compat = 1;
rsize->header.data_len = cpu_to_le32(sizeof(*rsize) - header_len);
rsize->total_ops = cpu_to_le64(m->total_reads);
rsize->total_size = cpu_to_le64(m->read_size_sum);
items++;
/* encode the write io size metric */
wsize = (struct ceph_write_io_size *)(rsize + 1);
wsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_IO_SIZES);
wsize->header.ver = 1;
wsize->header.compat = 1;
wsize->header.data_len = cpu_to_le32(sizeof(*wsize) - header_len);
wsize->total_ops = cpu_to_le64(m->total_writes);
wsize->total_size = cpu_to_le64(m->write_size_sum);
items++;
put_unaligned_le32(items, &head->num); put_unaligned_le32(items, &head->num);
msg->front.iov_len = len; msg->front.iov_len = len;
msg->hdr.version = cpu_to_le16(1); msg->hdr.version = cpu_to_le16(1);
...@@ -225,6 +249,9 @@ int ceph_metric_init(struct ceph_client_metric *m) ...@@ -225,6 +249,9 @@ int ceph_metric_init(struct ceph_client_metric *m)
m->read_latency_max = 0; m->read_latency_max = 0;
m->total_reads = 0; m->total_reads = 0;
m->read_latency_sum = 0; m->read_latency_sum = 0;
m->read_size_min = U64_MAX;
m->read_size_max = 0;
m->read_size_sum = 0;
spin_lock_init(&m->write_metric_lock); spin_lock_init(&m->write_metric_lock);
m->write_latency_sq_sum = 0; m->write_latency_sq_sum = 0;
...@@ -232,6 +259,9 @@ int ceph_metric_init(struct ceph_client_metric *m) ...@@ -232,6 +259,9 @@ int ceph_metric_init(struct ceph_client_metric *m)
m->write_latency_max = 0; m->write_latency_max = 0;
m->total_writes = 0; m->total_writes = 0;
m->write_latency_sum = 0; m->write_latency_sum = 0;
m->write_size_min = U64_MAX;
m->write_size_max = 0;
m->write_size_sum = 0;
spin_lock_init(&m->metadata_metric_lock); spin_lock_init(&m->metadata_metric_lock);
m->metadata_latency_sq_sum = 0; m->metadata_latency_sq_sum = 0;
...@@ -281,23 +311,21 @@ void ceph_metric_destroy(struct ceph_client_metric *m) ...@@ -281,23 +311,21 @@ void ceph_metric_destroy(struct ceph_client_metric *m)
cancel_delayed_work_sync(&m->delayed_work); cancel_delayed_work_sync(&m->delayed_work);
if (m->session) ceph_put_mds_session(m->session);
ceph_put_mds_session(m->session);
} }
static inline void __update_latency(ktime_t *totalp, ktime_t *lsump, #define METRIC_UPDATE_MIN_MAX(min, max, new) \
ktime_t *min, ktime_t *max, { \
ktime_t *sq_sump, ktime_t lat) if (unlikely(new < min)) \
{ min = new; \
ktime_t total, avg, sq, lsum; if (unlikely(new > max)) \
max = new; \
total = ++(*totalp); }
lsum = (*lsump += lat);
if (unlikely(lat < *min)) static inline void __update_stdev(ktime_t total, ktime_t lsum,
*min = lat; ktime_t *sq_sump, ktime_t lat)
if (unlikely(lat > *max)) {
*max = lat; ktime_t avg, sq;
if (unlikely(total == 1)) if (unlikely(total == 1))
return; return;
...@@ -312,33 +340,51 @@ static inline void __update_latency(ktime_t *totalp, ktime_t *lsump, ...@@ -312,33 +340,51 @@ static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
void ceph_update_read_metrics(struct ceph_client_metric *m, void ceph_update_read_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end, ktime_t r_start, ktime_t r_end,
int rc) unsigned int size, int rc)
{ {
ktime_t lat = ktime_sub(r_end, r_start); ktime_t lat = ktime_sub(r_end, r_start);
ktime_t total;
if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT)) if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT))
return; return;
spin_lock(&m->read_metric_lock); spin_lock(&m->read_metric_lock);
__update_latency(&m->total_reads, &m->read_latency_sum, total = ++m->total_reads;
&m->read_latency_min, &m->read_latency_max, m->read_size_sum += size;
&m->read_latency_sq_sum, lat); m->read_latency_sum += lat;
METRIC_UPDATE_MIN_MAX(m->read_size_min,
m->read_size_max,
size);
METRIC_UPDATE_MIN_MAX(m->read_latency_min,
m->read_latency_max,
lat);
__update_stdev(total, m->read_latency_sum,
&m->read_latency_sq_sum, lat);
spin_unlock(&m->read_metric_lock); spin_unlock(&m->read_metric_lock);
} }
void ceph_update_write_metrics(struct ceph_client_metric *m, void ceph_update_write_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end, ktime_t r_start, ktime_t r_end,
int rc) unsigned int size, int rc)
{ {
ktime_t lat = ktime_sub(r_end, r_start); ktime_t lat = ktime_sub(r_end, r_start);
ktime_t total;
if (unlikely(rc && rc != -ETIMEDOUT)) if (unlikely(rc && rc != -ETIMEDOUT))
return; return;
spin_lock(&m->write_metric_lock); spin_lock(&m->write_metric_lock);
__update_latency(&m->total_writes, &m->write_latency_sum, total = ++m->total_writes;
&m->write_latency_min, &m->write_latency_max, m->write_size_sum += size;
&m->write_latency_sq_sum, lat); m->write_latency_sum += lat;
METRIC_UPDATE_MIN_MAX(m->write_size_min,
m->write_size_max,
size);
METRIC_UPDATE_MIN_MAX(m->write_latency_min,
m->write_latency_max,
lat);
__update_stdev(total, m->write_latency_sum,
&m->write_latency_sq_sum, lat);
spin_unlock(&m->write_metric_lock); spin_unlock(&m->write_metric_lock);
} }
...@@ -347,13 +393,18 @@ void ceph_update_metadata_metrics(struct ceph_client_metric *m, ...@@ -347,13 +393,18 @@ void ceph_update_metadata_metrics(struct ceph_client_metric *m,
int rc) int rc)
{ {
ktime_t lat = ktime_sub(r_end, r_start); ktime_t lat = ktime_sub(r_end, r_start);
ktime_t total;
if (unlikely(rc && rc != -ENOENT)) if (unlikely(rc && rc != -ENOENT))
return; return;
spin_lock(&m->metadata_metric_lock); spin_lock(&m->metadata_metric_lock);
__update_latency(&m->total_metadatas, &m->metadata_latency_sum, total = ++m->total_metadatas;
&m->metadata_latency_min, &m->metadata_latency_max, m->metadata_latency_sum += lat;
&m->metadata_latency_sq_sum, lat); METRIC_UPDATE_MIN_MAX(m->metadata_latency_min,
m->metadata_latency_max,
lat);
__update_stdev(total, m->metadata_latency_sum,
&m->metadata_latency_sq_sum, lat);
spin_unlock(&m->metadata_metric_lock); spin_unlock(&m->metadata_metric_lock);
} }
...@@ -17,8 +17,10 @@ enum ceph_metric_type { ...@@ -17,8 +17,10 @@ enum ceph_metric_type {
CLIENT_METRIC_TYPE_OPENED_FILES, CLIENT_METRIC_TYPE_OPENED_FILES,
CLIENT_METRIC_TYPE_PINNED_ICAPS, CLIENT_METRIC_TYPE_PINNED_ICAPS,
CLIENT_METRIC_TYPE_OPENED_INODES, CLIENT_METRIC_TYPE_OPENED_INODES,
CLIENT_METRIC_TYPE_READ_IO_SIZES,
CLIENT_METRIC_TYPE_WRITE_IO_SIZES,
CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_OPENED_INODES, CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_WRITE_IO_SIZES,
}; };
/* /*
...@@ -34,18 +36,22 @@ enum ceph_metric_type { ...@@ -34,18 +36,22 @@ enum ceph_metric_type {
CLIENT_METRIC_TYPE_OPENED_FILES, \ CLIENT_METRIC_TYPE_OPENED_FILES, \
CLIENT_METRIC_TYPE_PINNED_ICAPS, \ CLIENT_METRIC_TYPE_PINNED_ICAPS, \
CLIENT_METRIC_TYPE_OPENED_INODES, \ CLIENT_METRIC_TYPE_OPENED_INODES, \
CLIENT_METRIC_TYPE_READ_IO_SIZES, \
CLIENT_METRIC_TYPE_WRITE_IO_SIZES, \
\ \
CLIENT_METRIC_TYPE_MAX, \ CLIENT_METRIC_TYPE_MAX, \
} }
/* metric caps header */ struct ceph_metric_header {
struct ceph_metric_cap {
__le32 type; /* ceph metric type */ __le32 type; /* ceph metric type */
__u8 ver; __u8 ver;
__u8 compat; __u8 compat;
__le32 data_len; /* length of sizeof(hit + mis + total) */ __le32 data_len; /* length of sizeof(hit + mis + total) */
} __packed;
/* metric caps header */
struct ceph_metric_cap {
struct ceph_metric_header header;
__le64 hit; __le64 hit;
__le64 mis; __le64 mis;
__le64 total; __le64 total;
...@@ -53,48 +59,28 @@ struct ceph_metric_cap { ...@@ -53,48 +59,28 @@ struct ceph_metric_cap {
/* metric read latency header */ /* metric read latency header */
struct ceph_metric_read_latency { struct ceph_metric_read_latency {
__le32 type; /* ceph metric type */ struct ceph_metric_header header;
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(sec + nsec) */
__le32 sec; __le32 sec;
__le32 nsec; __le32 nsec;
} __packed; } __packed;
/* metric write latency header */ /* metric write latency header */
struct ceph_metric_write_latency { struct ceph_metric_write_latency {
__le32 type; /* ceph metric type */ struct ceph_metric_header header;
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(sec + nsec) */
__le32 sec; __le32 sec;
__le32 nsec; __le32 nsec;
} __packed; } __packed;
/* metric metadata latency header */ /* metric metadata latency header */
struct ceph_metric_metadata_latency { struct ceph_metric_metadata_latency {
__le32 type; /* ceph metric type */ struct ceph_metric_header header;
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(sec + nsec) */
__le32 sec; __le32 sec;
__le32 nsec; __le32 nsec;
} __packed; } __packed;
/* metric dentry lease header */ /* metric dentry lease header */
struct ceph_metric_dlease { struct ceph_metric_dlease {
__le32 type; /* ceph metric type */ struct ceph_metric_header header;
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(hit + mis + total) */
__le64 hit; __le64 hit;
__le64 mis; __le64 mis;
__le64 total; __le64 total;
...@@ -102,40 +88,39 @@ struct ceph_metric_dlease { ...@@ -102,40 +88,39 @@ struct ceph_metric_dlease {
/* metric opened files header */ /* metric opened files header */
struct ceph_opened_files { struct ceph_opened_files {
__le32 type; /* ceph metric type */ struct ceph_metric_header header;
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(opened_files + total) */
__le64 opened_files; __le64 opened_files;
__le64 total; __le64 total;
} __packed; } __packed;
/* metric pinned i_caps header */ /* metric pinned i_caps header */
struct ceph_pinned_icaps { struct ceph_pinned_icaps {
__le32 type; /* ceph metric type */ struct ceph_metric_header header;
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(pinned_icaps + total) */
__le64 pinned_icaps; __le64 pinned_icaps;
__le64 total; __le64 total;
} __packed; } __packed;
/* metric opened inodes header */ /* metric opened inodes header */
struct ceph_opened_inodes { struct ceph_opened_inodes {
__le32 type; /* ceph metric type */ struct ceph_metric_header header;
__u8 ver;
__u8 compat;
__le32 data_len; /* length of sizeof(opened_inodes + total) */
__le64 opened_inodes; __le64 opened_inodes;
__le64 total; __le64 total;
} __packed; } __packed;
/* metric read io size header */
struct ceph_read_io_size {
struct ceph_metric_header header;
__le64 total_ops;
__le64 total_size;
} __packed;
/* metric write io size header */
struct ceph_write_io_size {
struct ceph_metric_header header;
__le64 total_ops;
__le64 total_size;
} __packed;
struct ceph_metric_head { struct ceph_metric_head {
__le32 num; /* the number of metrics that will be sent */ __le32 num; /* the number of metrics that will be sent */
} __packed; } __packed;
...@@ -152,6 +137,9 @@ struct ceph_client_metric { ...@@ -152,6 +137,9 @@ struct ceph_client_metric {
spinlock_t read_metric_lock; spinlock_t read_metric_lock;
u64 total_reads; u64 total_reads;
u64 read_size_sum;
u64 read_size_min;
u64 read_size_max;
ktime_t read_latency_sum; ktime_t read_latency_sum;
ktime_t read_latency_sq_sum; ktime_t read_latency_sq_sum;
ktime_t read_latency_min; ktime_t read_latency_min;
...@@ -159,6 +147,9 @@ struct ceph_client_metric { ...@@ -159,6 +147,9 @@ struct ceph_client_metric {
spinlock_t write_metric_lock; spinlock_t write_metric_lock;
u64 total_writes; u64 total_writes;
u64 write_size_sum;
u64 write_size_min;
u64 write_size_max;
ktime_t write_latency_sum; ktime_t write_latency_sum;
ktime_t write_latency_sq_sum; ktime_t write_latency_sq_sum;
ktime_t write_latency_min; ktime_t write_latency_min;
...@@ -206,10 +197,10 @@ static inline void ceph_update_cap_mis(struct ceph_client_metric *m) ...@@ -206,10 +197,10 @@ static inline void ceph_update_cap_mis(struct ceph_client_metric *m)
extern void ceph_update_read_metrics(struct ceph_client_metric *m, extern void ceph_update_read_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end, ktime_t r_start, ktime_t r_end,
int rc); unsigned int size, int rc);
extern void ceph_update_write_metrics(struct ceph_client_metric *m, extern void ceph_update_write_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end, ktime_t r_start, ktime_t r_end,
int rc); unsigned int size, int rc);
extern void ceph_update_metadata_metrics(struct ceph_client_metric *m, extern void ceph_update_metadata_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end, ktime_t r_start, ktime_t r_end,
int rc); int rc);
......
...@@ -74,8 +74,7 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc, ...@@ -74,8 +74,7 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
le64_to_cpu(h->max_files)); le64_to_cpu(h->max_files));
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
/* avoid calling iput_final() in dispatch thread */ iput(inode);
ceph_async_iput(inode);
} }
static struct ceph_quotarealm_inode * static struct ceph_quotarealm_inode *
...@@ -247,8 +246,7 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc, ...@@ -247,8 +246,7 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,
ci = ceph_inode(in); ci = ceph_inode(in);
has_quota = __ceph_has_any_quota(ci); has_quota = __ceph_has_any_quota(ci);
/* avoid calling iput_final() while holding mdsc->snap_rwsem */ iput(in);
ceph_async_iput(in);
next = realm->parent; next = realm->parent;
if (has_quota || !next) if (has_quota || !next)
...@@ -383,8 +381,7 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op, ...@@ -383,8 +381,7 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op,
pr_warn("Invalid quota check op (%d)\n", op); pr_warn("Invalid quota check op (%d)\n", op);
exceeded = true; /* Just break the loop */ exceeded = true; /* Just break the loop */
} }
/* avoid calling iput_final() while holding mdsc->snap_rwsem */ iput(in);
ceph_async_iput(in);
next = realm->parent; next = realm->parent;
if (exceeded || !next) if (exceeded || !next)
......
...@@ -60,11 +60,13 @@ ...@@ -60,11 +60,13 @@
/* /*
* increase ref count for the realm * increase ref count for the realm
* *
* caller must hold snap_rwsem for write. * caller must hold snap_rwsem.
*/ */
void ceph_get_snap_realm(struct ceph_mds_client *mdsc, void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm) struct ceph_snap_realm *realm)
{ {
lockdep_assert_held(&mdsc->snap_rwsem);
dout("get_realm %p %d -> %d\n", realm, dout("get_realm %p %d -> %d\n", realm,
atomic_read(&realm->nref), atomic_read(&realm->nref)+1); atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
/* /*
...@@ -113,6 +115,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm( ...@@ -113,6 +115,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
{ {
struct ceph_snap_realm *realm; struct ceph_snap_realm *realm;
lockdep_assert_held_write(&mdsc->snap_rwsem);
realm = kzalloc(sizeof(*realm), GFP_NOFS); realm = kzalloc(sizeof(*realm), GFP_NOFS);
if (!realm) if (!realm)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
...@@ -135,7 +139,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm( ...@@ -135,7 +139,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
/* /*
* lookup the realm rooted at @ino. * lookup the realm rooted at @ino.
* *
* caller must hold snap_rwsem for write. * caller must hold snap_rwsem.
*/ */
static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc, static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
u64 ino) u64 ino)
...@@ -143,6 +147,8 @@ static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc, ...@@ -143,6 +147,8 @@ static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
struct rb_node *n = mdsc->snap_realms.rb_node; struct rb_node *n = mdsc->snap_realms.rb_node;
struct ceph_snap_realm *r; struct ceph_snap_realm *r;
lockdep_assert_held(&mdsc->snap_rwsem);
while (n) { while (n) {
r = rb_entry(n, struct ceph_snap_realm, node); r = rb_entry(n, struct ceph_snap_realm, node);
if (ino < r->ino) if (ino < r->ino)
...@@ -176,6 +182,8 @@ static void __put_snap_realm(struct ceph_mds_client *mdsc, ...@@ -176,6 +182,8 @@ static void __put_snap_realm(struct ceph_mds_client *mdsc,
static void __destroy_snap_realm(struct ceph_mds_client *mdsc, static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm) struct ceph_snap_realm *realm)
{ {
lockdep_assert_held_write(&mdsc->snap_rwsem);
dout("__destroy_snap_realm %p %llx\n", realm, realm->ino); dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
rb_erase(&realm->node, &mdsc->snap_realms); rb_erase(&realm->node, &mdsc->snap_realms);
...@@ -198,6 +206,8 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc, ...@@ -198,6 +206,8 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
static void __put_snap_realm(struct ceph_mds_client *mdsc, static void __put_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm) struct ceph_snap_realm *realm)
{ {
lockdep_assert_held_write(&mdsc->snap_rwsem);
dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm, dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
atomic_read(&realm->nref), atomic_read(&realm->nref)-1); atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
if (atomic_dec_and_test(&realm->nref)) if (atomic_dec_and_test(&realm->nref))
...@@ -236,6 +246,8 @@ static void __cleanup_empty_realms(struct ceph_mds_client *mdsc) ...@@ -236,6 +246,8 @@ static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
{ {
struct ceph_snap_realm *realm; struct ceph_snap_realm *realm;
lockdep_assert_held_write(&mdsc->snap_rwsem);
spin_lock(&mdsc->snap_empty_lock); spin_lock(&mdsc->snap_empty_lock);
while (!list_empty(&mdsc->snap_empty)) { while (!list_empty(&mdsc->snap_empty)) {
realm = list_first_entry(&mdsc->snap_empty, realm = list_first_entry(&mdsc->snap_empty,
...@@ -269,6 +281,8 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc, ...@@ -269,6 +281,8 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
{ {
struct ceph_snap_realm *parent; struct ceph_snap_realm *parent;
lockdep_assert_held_write(&mdsc->snap_rwsem);
if (realm->parent_ino == parentino) if (realm->parent_ino == parentino)
return 0; return 0;
...@@ -460,7 +474,7 @@ static bool has_new_snaps(struct ceph_snap_context *o, ...@@ -460,7 +474,7 @@ static bool has_new_snaps(struct ceph_snap_context *o,
* Caller must hold snap_rwsem for read (i.e., the realm topology won't * Caller must hold snap_rwsem for read (i.e., the realm topology won't
* change). * change).
*/ */
void ceph_queue_cap_snap(struct ceph_inode_info *ci) static void ceph_queue_cap_snap(struct ceph_inode_info *ci)
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap; struct ceph_cap_snap *capsnap;
...@@ -663,15 +677,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) ...@@ -663,15 +677,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
if (!inode) if (!inode)
continue; continue;
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
/* avoid calling iput_final() while holding iput(lastinode);
* mdsc->snap_rwsem or in mds dispatch threads */
ceph_async_iput(lastinode);
lastinode = inode; lastinode = inode;
ceph_queue_cap_snap(ci); ceph_queue_cap_snap(ci);
spin_lock(&realm->inodes_with_caps_lock); spin_lock(&realm->inodes_with_caps_lock);
} }
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
ceph_async_iput(lastinode); iput(lastinode);
dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino); dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
} }
...@@ -696,6 +708,8 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ...@@ -696,6 +708,8 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
int err = -ENOMEM; int err = -ENOMEM;
LIST_HEAD(dirty_realms); LIST_HEAD(dirty_realms);
lockdep_assert_held_write(&mdsc->snap_rwsem);
dout("update_snap_trace deletion=%d\n", deletion); dout("update_snap_trace deletion=%d\n", deletion);
more: more:
ceph_decode_need(&p, e, sizeof(*ri), bad); ceph_decode_need(&p, e, sizeof(*ri), bad);
...@@ -791,7 +805,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ...@@ -791,7 +805,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
return 0; return 0;
bad: bad:
err = -EINVAL; err = -EIO;
fail: fail:
if (realm && !IS_ERR(realm)) if (realm && !IS_ERR(realm))
ceph_put_snap_realm(mdsc, realm); ceph_put_snap_realm(mdsc, realm);
...@@ -823,17 +837,12 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ...@@ -823,17 +837,12 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
ihold(inode); ihold(inode);
spin_unlock(&mdsc->snap_flush_lock); spin_unlock(&mdsc->snap_flush_lock);
ceph_flush_snaps(ci, &session); ceph_flush_snaps(ci, &session);
/* avoid calling iput_final() while holding iput(inode);
* session->s_mutex or in mds dispatch threads */
ceph_async_iput(inode);
spin_lock(&mdsc->snap_flush_lock); spin_lock(&mdsc->snap_flush_lock);
} }
spin_unlock(&mdsc->snap_flush_lock); spin_unlock(&mdsc->snap_flush_lock);
if (session) { ceph_put_mds_session(session);
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
}
dout("flush_snaps done\n"); dout("flush_snaps done\n");
} }
...@@ -969,14 +978,12 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -969,14 +978,12 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
ceph_get_snap_realm(mdsc, realm); ceph_get_snap_realm(mdsc, realm);
ceph_put_snap_realm(mdsc, oldrealm); ceph_put_snap_realm(mdsc, oldrealm);
/* avoid calling iput_final() while holding iput(inode);
* mdsc->snap_rwsem or mds in dispatch threads */
ceph_async_iput(inode);
continue; continue;
skip_inode: skip_inode:
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ceph_async_iput(inode); iput(inode);
} }
/* we may have taken some of the old realm's children. */ /* we may have taken some of the old realm's children. */
......
...@@ -931,7 +931,6 @@ extern int ceph_update_snap_trace(struct ceph_mds_client *m, ...@@ -931,7 +931,6 @@ extern int ceph_update_snap_trace(struct ceph_mds_client *m,
extern void ceph_handle_snap(struct ceph_mds_client *mdsc, extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_msg *msg); struct ceph_msg *msg);
extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap); struct ceph_cap_snap *capsnap);
extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
...@@ -989,8 +988,6 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask); ...@@ -989,8 +988,6 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask);
extern bool ceph_inode_set_size(struct inode *inode, loff_t size); extern bool ceph_inode_set_size(struct inode *inode, loff_t size);
extern void __ceph_do_pending_vmtruncate(struct inode *inode); extern void __ceph_do_pending_vmtruncate(struct inode *inode);
extern void ceph_async_iput(struct inode *inode);
void ceph_queue_inode_work(struct inode *inode, int work_bit); void ceph_queue_inode_work(struct inode *inode, int work_bit);
static inline void ceph_queue_vmtruncate(struct inode *inode) static inline void ceph_queue_vmtruncate(struct inode *inode)
......
...@@ -58,12 +58,10 @@ struct ceph_auth_client *ceph_auth_init(const char *name, ...@@ -58,12 +58,10 @@ struct ceph_auth_client *ceph_auth_init(const char *name,
const int *con_modes) const int *con_modes)
{ {
struct ceph_auth_client *ac; struct ceph_auth_client *ac;
int ret;
ret = -ENOMEM;
ac = kzalloc(sizeof(*ac), GFP_NOFS); ac = kzalloc(sizeof(*ac), GFP_NOFS);
if (!ac) if (!ac)
goto out; return ERR_PTR(-ENOMEM);
mutex_init(&ac->mutex); mutex_init(&ac->mutex);
ac->negotiating = true; ac->negotiating = true;
...@@ -78,9 +76,6 @@ struct ceph_auth_client *ceph_auth_init(const char *name, ...@@ -78,9 +76,6 @@ struct ceph_auth_client *ceph_auth_init(const char *name,
dout("%s name '%s' preferred_mode %d fallback_mode %d\n", __func__, dout("%s name '%s' preferred_mode %d fallback_mode %d\n", __func__,
ac->name, ac->preferred_mode, ac->fallback_mode); ac->name, ac->preferred_mode, ac->fallback_mode);
return ac; return ac;
out:
return ERR_PTR(ret);
} }
void ceph_auth_destroy(struct ceph_auth_client *ac) void ceph_auth_destroy(struct ceph_auth_client *ac)
......
...@@ -112,8 +112,8 @@ static int ceph_auth_none_create_authorizer( ...@@ -112,8 +112,8 @@ static int ceph_auth_none_create_authorizer(
auth->authorizer = (struct ceph_authorizer *) au; auth->authorizer = (struct ceph_authorizer *) au;
auth->authorizer_buf = au->buf; auth->authorizer_buf = au->buf;
auth->authorizer_buf_len = au->buf_len; auth->authorizer_buf_len = au->buf_len;
auth->authorizer_reply_buf = au->reply_buf; auth->authorizer_reply_buf = NULL;
auth->authorizer_reply_buf_len = sizeof (au->reply_buf); auth->authorizer_reply_buf_len = 0;
return 0; return 0;
} }
......
...@@ -16,7 +16,6 @@ struct ceph_none_authorizer { ...@@ -16,7 +16,6 @@ struct ceph_none_authorizer {
struct ceph_authorizer base; struct ceph_authorizer base;
char buf[128]; char buf[128];
int buf_len; int buf_len;
char reply_buf[0];
}; };
struct ceph_auth_none_info { struct ceph_auth_none_info {
......
...@@ -10,7 +10,9 @@ ...@@ -10,7 +10,9 @@
/** /**
* ceph_cls_lock - grab rados lock for object * ceph_cls_lock - grab rados lock for object
* @oid, @oloc: object to lock * @osdc: OSD client instance
* @oid: object to lock
* @oloc: object to lock
* @lock_name: the name of the lock * @lock_name: the name of the lock
* @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED) * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED)
* @cookie: user-defined identifier for this instance of the lock * @cookie: user-defined identifier for this instance of the lock
...@@ -82,7 +84,9 @@ EXPORT_SYMBOL(ceph_cls_lock); ...@@ -82,7 +84,9 @@ EXPORT_SYMBOL(ceph_cls_lock);
/** /**
* ceph_cls_unlock - release rados lock for object * ceph_cls_unlock - release rados lock for object
* @oid, @oloc: object to lock * @osdc: OSD client instance
* @oid: object to lock
* @oloc: object to lock
* @lock_name: the name of the lock * @lock_name: the name of the lock
* @cookie: user-defined identifier for this instance of the lock * @cookie: user-defined identifier for this instance of the lock
*/ */
...@@ -130,7 +134,9 @@ EXPORT_SYMBOL(ceph_cls_unlock); ...@@ -130,7 +134,9 @@ EXPORT_SYMBOL(ceph_cls_unlock);
/** /**
* ceph_cls_break_lock - release rados lock for object for specified client * ceph_cls_break_lock - release rados lock for object for specified client
* @oid, @oloc: object to lock * @osdc: OSD client instance
* @oid: object to lock
* @oloc: object to lock
* @lock_name: the name of the lock * @lock_name: the name of the lock
* @cookie: user-defined identifier for this instance of the lock * @cookie: user-defined identifier for this instance of the lock
* @locker: current lock owner * @locker: current lock owner
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment