Commit ed9b430c authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: cleanup ceph_flush_snaps()

This patch devide __ceph_flush_snaps() into two stags. In the first
stage, __ceph_flush_snaps() assign snapcaps flush TIDs and add them
to cap flush lists. __ceph_flush_snaps() keeps holding the
i_ceph_lock in this stagge. So inode's auth cap can not change. In
the second stage, __ceph_flush_snaps() send flushsnap cap messages.
i_ceph_lock is unlocked before sending each cap message. If auth cap
changes in the middle, __ceph_flush_snaps() just stops. This is OK
because kick_flushing_inode_caps() will re-send flushsnap cap messages
to inode's new auth MDS.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 7bc00fdd
...@@ -1247,32 +1247,20 @@ static inline int __send_flush_snap(struct inode *inode, ...@@ -1247,32 +1247,20 @@ static inline int __send_flush_snap(struct inode *inode,
* *
* Called under i_ceph_lock. Takes s_mutex as needed. * Called under i_ceph_lock. Takes s_mutex as needed.
*/ */
void __ceph_flush_snaps(struct ceph_inode_info *ci, static void __ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session **psession) struct ceph_mds_session *session)
__releases(ci->i_ceph_lock) __releases(ci->i_ceph_lock)
__acquires(ci->i_ceph_lock) __acquires(ci->i_ceph_lock)
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
int mds; struct ceph_mds_client *mdsc = session->s_mdsc;
struct ceph_cap_snap *capsnap; struct ceph_cap_snap *capsnap;
u32 mseq; u64 oldest_flush_tid = 0;
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; u64 first_tid = 1, last_tid = 0;
struct ceph_mds_session *session = NULL; /* if session != NULL, we hold
session->s_mutex */
u64 oldest_flush_tid;
u64 next_follows = 0; /* keep track of how far we've gotten through the
i_cap_snaps list, and skip these entries next time
around to avoid an infinite loop */
if (psession) dout("__flush_snaps %p session %p\n", inode, session);
session = *psession;
dout("__flush_snaps %p\n", inode);
retry:
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
/* avoid an infiniute loop after retry */
if (capsnap->follows < next_follows)
continue;
/* /*
* we need to wait for sync writes to complete and for dirty * we need to wait for sync writes to complete and for dirty
* pages to be written out. * pages to be written out.
...@@ -1283,53 +1271,18 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci, ...@@ -1283,53 +1271,18 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
/* should be removed by ceph_try_drop_cap_snap() */ /* should be removed by ceph_try_drop_cap_snap() */
BUG_ON(!capsnap->need_flush); BUG_ON(!capsnap->need_flush);
/* pick mds, take s_mutex */
if (ci->i_auth_cap == NULL) {
dout("no auth cap (migrating?), doing nothing\n");
goto out;
}
/* only flush each capsnap once */ /* only flush each capsnap once */
if (capsnap->cap_flush.tid > 0) { if (capsnap->cap_flush.tid > 0) {
dout("already flushed %p, skipping\n", capsnap); dout(" already flushed %p, skipping\n", capsnap);
continue; continue;
} }
mds = ci->i_auth_cap->session->s_mds;
mseq = ci->i_auth_cap->mseq;
if (session && session->s_mds != mds) {
dout("oops, wrong session %p mutex\n", session);
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
session = NULL;
}
if (!session) {
spin_unlock(&ci->i_ceph_lock);
mutex_lock(&mdsc->mutex);
session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
if (session) {
dout("inverting session/ino locks on %p\n",
session);
mutex_lock(&session->s_mutex);
}
/*
* if session == NULL, we raced against a cap
* deletion or migration. retry, and we'll
* get a better @mds value next time.
*/
spin_lock(&ci->i_ceph_lock);
goto retry;
}
spin_lock(&mdsc->cap_dirty_lock); spin_lock(&mdsc->cap_dirty_lock);
capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid; capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
list_add_tail(&capsnap->cap_flush.g_list, list_add_tail(&capsnap->cap_flush.g_list,
&mdsc->cap_flush_list); &mdsc->cap_flush_list);
oldest_flush_tid = __get_oldest_flush_tid(mdsc); if (oldest_flush_tid == 0)
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
if (list_empty(&ci->i_flushing_item)) { if (list_empty(&ci->i_flushing_item)) {
list_add_tail(&ci->i_flushing_item, list_add_tail(&ci->i_flushing_item,
&session->s_cap_flushing); &session->s_cap_flushing);
...@@ -1339,41 +1292,108 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci, ...@@ -1339,41 +1292,108 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
list_add_tail(&capsnap->cap_flush.i_list, list_add_tail(&capsnap->cap_flush.i_list,
&ci->i_cap_flush_list); &ci->i_cap_flush_list);
if (first_tid == 1)
first_tid = capsnap->cap_flush.tid;
last_tid = capsnap->cap_flush.tid;
}
ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
while (first_tid <= last_tid) {
struct ceph_cap *cap = ci->i_auth_cap;
struct ceph_cap_flush *cf;
int ret;
if (!(cap && cap->session == session)) {
dout("__flush_snaps %p auth cap %p not mds%d, "
"stop\n", inode, cap, session->s_mds);
break;
}
ret = -ENOENT;
list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
if (cf->tid >= first_tid) {
ret = 0;
break;
}
}
if (ret < 0)
break;
first_tid = cf->tid + 1;
capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
atomic_inc(&capsnap->nref); atomic_inc(&capsnap->nref);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n", dout("__flush_snaps %p capsnap %p tid %llu %s\n",
inode, capsnap, capsnap->follows, capsnap->cap_flush.tid); inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty));
__send_flush_snap(inode, session, capsnap, mseq,
oldest_flush_tid);
next_follows = capsnap->follows + 1; ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
ceph_put_cap_snap(capsnap); oldest_flush_tid);
if (ret < 0) {
pr_err("__flush_snaps: error sending cap flushsnap, "
"ino (%llx.%llx) tid %llu follows %llu\n",
ceph_vinop(inode), cf->tid, capsnap->follows);
}
ceph_put_cap_snap(capsnap);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
goto retry;
} }
}
/* we flushed them all; remove this inode from the queue */ void ceph_flush_snaps(struct ceph_inode_info *ci,
spin_lock(&mdsc->snap_flush_lock); struct ceph_mds_session **psession)
list_del_init(&ci->i_snap_flush_item); {
spin_unlock(&mdsc->snap_flush_lock); struct inode *inode = &ci->vfs_inode;
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_session *session = *psession;
int mds;
dout("ceph_flush_snaps %p\n", inode);
retry:
spin_lock(&ci->i_ceph_lock);
if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) {
dout(" no capsnap needs flush, doing nothing\n");
goto out;
}
if (!ci->i_auth_cap) {
dout(" no auth cap (migrating?), doing nothing\n");
goto out;
}
out: mds = ci->i_auth_cap->session->s_mds;
if (psession) if (session && session->s_mds != mds) {
*psession = session; dout(" oops, wrong session %p mutex\n", session);
else if (session) {
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session); ceph_put_mds_session(session);
session = NULL;
}
if (!session) {
spin_unlock(&ci->i_ceph_lock);
mutex_lock(&mdsc->mutex);
session = __ceph_lookup_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
if (session) {
dout(" inverting session/ino locks on %p\n", session);
mutex_lock(&session->s_mutex);
}
goto retry;
} }
}
static void ceph_flush_snaps(struct ceph_inode_info *ci) __ceph_flush_snaps(ci, session);
{ out:
spin_lock(&ci->i_ceph_lock);
__ceph_flush_snaps(ci, NULL);
ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (psession) {
*psession = session;
} else {
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
}
/* we flushed them all; remove this inode from the queue */
spin_lock(&mdsc->snap_flush_lock);
list_del_init(&ci->i_snap_flush_item);
spin_unlock(&mdsc->snap_flush_lock);
} }
/* /*
...@@ -1768,10 +1788,9 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1768,10 +1788,9 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
oldest_flush_tid); oldest_flush_tid);
ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
} }
if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
__ceph_flush_snaps(ci, &session); __ceph_flush_snaps(ci, session);
ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
}
goto retry_locked; goto retry_locked;
} }
...@@ -2610,7 +2629,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) ...@@ -2610,7 +2629,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
if (last && !flushsnaps) if (last && !flushsnaps)
ceph_check_caps(ci, 0, NULL); ceph_check_caps(ci, 0, NULL);
else if (flushsnaps) else if (flushsnaps)
ceph_flush_snaps(ci); ceph_flush_snaps(ci, NULL);
if (wake) if (wake)
wake_up_all(&ci->i_cap_wq); wake_up_all(&ci->i_cap_wq);
while (put-- > 0) while (put-- > 0)
...@@ -2691,7 +2710,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, ...@@ -2691,7 +2710,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
if (last) { if (last) {
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
} else if (flush_snaps) { } else if (flush_snaps) {
ceph_flush_snaps(ci); ceph_flush_snaps(ci, NULL);
} }
if (complete_capsnap) if (complete_capsnap)
wake_up_all(&ci->i_cap_wq); wake_up_all(&ci->i_cap_wq);
......
...@@ -799,9 +799,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ...@@ -799,9 +799,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
inode = &ci->vfs_inode; inode = &ci->vfs_inode;
ihold(inode); ihold(inode);
spin_unlock(&mdsc->snap_flush_lock); spin_unlock(&mdsc->snap_flush_lock);
spin_lock(&ci->i_ceph_lock); ceph_flush_snaps(ci, &session);
__ceph_flush_snaps(ci, &session);
spin_unlock(&ci->i_ceph_lock);
iput(inode); iput(inode);
spin_lock(&mdsc->snap_flush_lock); spin_lock(&mdsc->snap_flush_lock);
} }
......
...@@ -890,8 +890,8 @@ extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps); ...@@ -890,8 +890,8 @@ extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had); extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc); struct ceph_snap_context *snapc);
extern void __ceph_flush_snaps(struct ceph_inode_info *ci, extern void ceph_flush_snaps(struct ceph_inode_info *ci,
struct ceph_mds_session **psession); struct ceph_mds_session **psession);
extern void ceph_check_caps(struct ceph_inode_info *ci, int flags, extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_session *session); struct ceph_mds_session *session);
extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc); extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment