Commit 2cd698be authored by Yan, Zheng's avatar Yan, Zheng

ceph: handle cap import atomically

cap import messages are processed by both handle_cap_import() and
handle_cap_grant(). These two functions are not executed in the same
atomic context, so they can races with cap release.

The fix is make handle_cap_import() not release the i_ceph_lock when
it returns. Let handle_cap_grant() release the lock after it finishes
its job.
Signed-off-by: default avatarYan, Zheng <zheng.z.yan@intel.com>
parent d9df2783
...@@ -2379,23 +2379,20 @@ static void invalidate_aliases(struct inode *inode) ...@@ -2379,23 +2379,20 @@ static void invalidate_aliases(struct inode *inode)
* actually be a revocation if it specifies a smaller cap set.) * actually be a revocation if it specifies a smaller cap set.)
* *
* caller holds s_mutex and i_ceph_lock, we drop both. * caller holds s_mutex and i_ceph_lock, we drop both.
*
* return value:
* 0 - ok
* 1 - check_caps on auth cap only (writeback)
* 2 - check_caps (ack revoke)
*/ */
static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
void *snaptrace, int snaptrace_len,
struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_cap *cap, struct ceph_cap *cap, int issued)
struct ceph_buffer *xattr_buf) __releases(ci->i_ceph_lock)
__releases(ci->i_ceph_lock)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds; int mds = session->s_mds;
int seq = le32_to_cpu(grant->seq); int seq = le32_to_cpu(grant->seq);
int newcaps = le32_to_cpu(grant->caps); int newcaps = le32_to_cpu(grant->caps);
int issued, implemented, used, wanted, dirty; int used, wanted, dirty;
u64 size = le64_to_cpu(grant->size); u64 size = le64_to_cpu(grant->size);
u64 max_size = le64_to_cpu(grant->max_size); u64 max_size = le64_to_cpu(grant->max_size);
struct timespec mtime, atime, ctime; struct timespec mtime, atime, ctime;
...@@ -2449,10 +2446,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ...@@ -2449,10 +2446,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
} }
/* side effects now are allowed */ /* side effects now are allowed */
issued = __ceph_caps_issued(ci, &implemented);
issued |= implemented | __ceph_caps_dirty(ci);
cap->cap_gen = session->s_cap_gen; cap->cap_gen = session->s_cap_gen;
cap->seq = seq; cap->seq = seq;
...@@ -2585,6 +2578,17 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ...@@ -2585,6 +2578,17 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
down_write(&mdsc->snap_rwsem);
ceph_update_snap_trace(mdsc, snaptrace,
snaptrace + snaptrace_len, false);
downgrade_write(&mdsc->snap_rwsem);
kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem);
if (newcaps & ~issued)
wake = 1;
}
if (queue_trunc) { if (queue_trunc) {
ceph_queue_vmtruncate(inode); ceph_queue_vmtruncate(inode);
ceph_queue_revalidate(inode); ceph_queue_revalidate(inode);
...@@ -2886,21 +2890,22 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -2886,21 +2890,22 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
} }
/* /*
* Handle cap IMPORT. If there are temp bits from an older EXPORT, * Handle cap IMPORT.
* clean them up.
* *
* caller holds s_mutex. * caller holds s_mutex. acquires i_ceph_lock
*/ */
static void handle_cap_import(struct ceph_mds_client *mdsc, static void handle_cap_import(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *im, struct inode *inode, struct ceph_mds_caps *im,
struct ceph_mds_cap_peer *ph, struct ceph_mds_cap_peer *ph,
struct ceph_mds_session *session, struct ceph_mds_session *session,
void *snaptrace, int snaptrace_len) struct ceph_cap **target_cap, int *old_issued)
__acquires(ci->i_ceph_lock)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *cap, *new_cap = NULL; struct ceph_cap *cap, *ocap, *new_cap = NULL;
int mds = session->s_mds; int mds = session->s_mds;
unsigned issued = le32_to_cpu(im->caps); int issued;
unsigned caps = le32_to_cpu(im->caps);
unsigned wanted = le32_to_cpu(im->wanted); unsigned wanted = le32_to_cpu(im->wanted);
unsigned seq = le32_to_cpu(im->seq); unsigned seq = le32_to_cpu(im->seq);
unsigned mseq = le32_to_cpu(im->migrate_seq); unsigned mseq = le32_to_cpu(im->migrate_seq);
...@@ -2929,44 +2934,43 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, ...@@ -2929,44 +2934,43 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
new_cap = ceph_get_cap(mdsc, NULL); new_cap = ceph_get_cap(mdsc, NULL);
goto retry; goto retry;
} }
cap = new_cap;
} else {
if (new_cap) {
ceph_put_cap(mdsc, new_cap);
new_cap = NULL;
}
} }
ceph_add_cap(inode, session, cap_id, -1, issued, wanted, seq, mseq, __ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
realmino, CEPH_CAP_FLAG_AUTH, &new_cap); realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
if (cap && cap->cap_id == p_cap_id) { if (ocap && ocap->cap_id == p_cap_id) {
dout(" remove export cap %p mds%d flags %d\n", dout(" remove export cap %p mds%d flags %d\n",
cap, peer, ph->flags); ocap, peer, ph->flags);
if ((ph->flags & CEPH_CAP_FLAG_AUTH) && if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
(cap->seq != le32_to_cpu(ph->seq) || (ocap->seq != le32_to_cpu(ph->seq) ||
cap->mseq != le32_to_cpu(ph->mseq))) { ocap->mseq != le32_to_cpu(ph->mseq))) {
pr_err("handle_cap_import: mismatched seq/mseq: " pr_err("handle_cap_import: mismatched seq/mseq: "
"ino (%llx.%llx) mds%d seq %d mseq %d " "ino (%llx.%llx) mds%d seq %d mseq %d "
"importer mds%d has peer seq %d mseq %d\n", "importer mds%d has peer seq %d mseq %d\n",
ceph_vinop(inode), peer, cap->seq, ceph_vinop(inode), peer, ocap->seq,
cap->mseq, mds, le32_to_cpu(ph->seq), ocap->mseq, mds, le32_to_cpu(ph->seq),
le32_to_cpu(ph->mseq)); le32_to_cpu(ph->mseq));
} }
__ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
} }
/* make sure we re-request max_size, if necessary */ /* make sure we re-request max_size, if necessary */
ci->i_wanted_max_size = 0; ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0; ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock);
wake_up_all(&ci->i_cap_wq);
down_write(&mdsc->snap_rwsem); *old_issued = issued;
ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, *target_cap = cap;
false);
downgrade_write(&mdsc->snap_rwsem);
kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem);
if (new_cap)
ceph_put_cap(mdsc, new_cap);
} }
/* /*
...@@ -2986,7 +2990,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -2986,7 +2990,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_mds_caps *h; struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL; struct ceph_mds_cap_peer *peer = NULL;
int mds = session->s_mds; int mds = session->s_mds;
int op; int op, issued;
u32 seq, mseq; u32 seq, mseq;
struct ceph_vino vino; struct ceph_vino vino;
u64 cap_id; u64 cap_id;
...@@ -3078,7 +3082,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3078,7 +3082,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_IMPORT: case CEPH_CAP_OP_IMPORT:
handle_cap_import(mdsc, inode, h, peer, session, handle_cap_import(mdsc, inode, h, peer, session,
snaptrace, snaptrace_len); &cap, &issued);
handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
msg->middle, session, cap, issued);
goto done_unlocked;
} }
/* the rest require a cap */ /* the rest require a cap */
...@@ -3095,8 +3102,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3095,8 +3102,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
switch (op) { switch (op) {
case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_REVOKE:
case CEPH_CAP_OP_GRANT: case CEPH_CAP_OP_GRANT:
case CEPH_CAP_OP_IMPORT: __ceph_caps_issued(ci, &issued);
handle_cap_grant(inode, h, session, cap, msg->middle); issued |= __ceph_caps_dirty(ci);
handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle,
session, cap, issued);
goto done_unlocked; goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK: case CEPH_CAP_OP_FLUSH_ACK:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment