Commit e1518c7c authored by Sage Weil's avatar Sage Weil

ceph: clean up mds reply, error handling

We would occasionally BUG out in the reply handler because r_reply was
nonzero, due to a race with ceph_mdsc_do_request temporarily setting
r_reply to an ERR_PTR value.  This is unnecessary, messy, and also wrong
in the EIO case.

Clean up by consistently using r_err for errors and r_reply for messages.
Also fix the abort logic to trigger consistently for all errors that return
to the caller early (e.g., EIO from timeout case).  If an abort races with
a reply, use the result from the reply.

Also fix locking for r_err, r_reply update in the reply handler.
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent e40152ee
...@@ -1517,7 +1517,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, ...@@ -1517,7 +1517,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
} }
msg = create_request_message(mdsc, req, mds); msg = create_request_message(mdsc, req, mds);
if (IS_ERR(msg)) { if (IS_ERR(msg)) {
req->r_reply = ERR_PTR(PTR_ERR(msg)); req->r_err = PTR_ERR(msg);
complete_request(mdsc, req); complete_request(mdsc, req);
return -PTR_ERR(msg); return -PTR_ERR(msg);
} }
...@@ -1552,7 +1552,7 @@ static int __do_request(struct ceph_mds_client *mdsc, ...@@ -1552,7 +1552,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
int mds = -1; int mds = -1;
int err = -EAGAIN; int err = -EAGAIN;
if (req->r_reply) if (req->r_err || req->r_got_result)
goto out; goto out;
if (req->r_timeout && if (req->r_timeout &&
...@@ -1609,7 +1609,7 @@ static int __do_request(struct ceph_mds_client *mdsc, ...@@ -1609,7 +1609,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
return err; return err;
finish: finish:
req->r_reply = ERR_PTR(err); req->r_err = err;
complete_request(mdsc, req); complete_request(mdsc, req);
goto out; goto out;
} }
...@@ -1689,59 +1689,53 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ...@@ -1689,59 +1689,53 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
__register_request(mdsc, req, dir); __register_request(mdsc, req, dir);
__do_request(mdsc, req); __do_request(mdsc, req);
/* wait */ if (req->r_err) {
if (!req->r_reply) { err = req->r_err;
mutex_unlock(&mdsc->mutex); __unregister_request(mdsc, req);
if (req->r_timeout) { dout("do_request early error %d\n", err);
err = (long)wait_for_completion_interruptible_timeout( goto out;
&req->r_completion, req->r_timeout);
if (err == 0)
req->r_reply = ERR_PTR(-EIO);
else if (err < 0)
req->r_reply = ERR_PTR(err);
} else {
err = wait_for_completion_interruptible(
&req->r_completion);
if (err)
req->r_reply = ERR_PTR(err);
}
mutex_lock(&mdsc->mutex);
} }
if (IS_ERR(req->r_reply)) { /* wait */
err = PTR_ERR(req->r_reply); mutex_unlock(&mdsc->mutex);
req->r_reply = NULL; dout("do_request waiting\n");
if (req->r_timeout) {
if (err == -ERESTARTSYS) { err = (long)wait_for_completion_interruptible_timeout(
/* aborted */ &req->r_completion, req->r_timeout);
req->r_aborted = true; if (err == 0)
err = -EIO;
} else {
err = wait_for_completion_interruptible(&req->r_completion);
}
dout("do_request waited, got %d\n", err);
mutex_lock(&mdsc->mutex);
if (req->r_locked_dir && /* only abort if we didn't race with a real reply */
(req->r_op & CEPH_MDS_OP_WRITE)) { if (req->r_got_result) {
struct ceph_inode_info *ci = err = le32_to_cpu(req->r_reply_info.head->result);
ceph_inode(req->r_locked_dir); } else if (err < 0) {
dout("aborted request %lld with %d\n", req->r_tid, err);
req->r_err = err;
req->r_aborted = true;
dout("aborted, clearing I_COMPLETE on %p\n", if (req->r_locked_dir &&
req->r_locked_dir); (req->r_op & CEPH_MDS_OP_WRITE)) {
spin_lock(&req->r_locked_dir->i_lock); struct ceph_inode_info *ci =
ci->i_ceph_flags &= ~CEPH_I_COMPLETE; ceph_inode(req->r_locked_dir);
ci->i_release_count++;
spin_unlock(&req->r_locked_dir->i_lock); dout("aborted, clearing I_COMPLETE on %p\n",
} req->r_locked_dir);
} else { spin_lock(&req->r_locked_dir->i_lock);
/* clean up this request */ ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
__unregister_request(mdsc, req); ci->i_release_count++;
if (!list_empty(&req->r_unsafe_item)) spin_unlock(&req->r_locked_dir->i_lock);
list_del_init(&req->r_unsafe_item);
complete(&req->r_safe_completion);
} }
} else if (req->r_err) {
err = req->r_err;
} else { } else {
err = le32_to_cpu(req->r_reply_info.head->result); err = req->r_err;
} }
mutex_unlock(&mdsc->mutex);
out:
mutex_unlock(&mdsc->mutex);
dout("do_request %p done, result %d\n", req, err); dout("do_request %p done, result %d\n", req, err);
return err; return err;
} }
...@@ -1838,11 +1832,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -1838,11 +1832,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
goto out; goto out;
} }
} } else {
BUG_ON(req->r_reply);
if (!head->safe) {
req->r_got_unsafe = true; req->r_got_unsafe = true;
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
} }
...@@ -1880,12 +1870,19 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -1880,12 +1870,19 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
up_read(&mdsc->snap_rwsem); up_read(&mdsc->snap_rwsem);
out_err: out_err:
if (err) { mutex_lock(&mdsc->mutex);
req->r_err = err; if (!req->r_aborted) {
if (err) {
req->r_err = err;
} else {
req->r_reply = msg;
ceph_msg_get(msg);
req->r_got_result = true;
}
} else { } else {
req->r_reply = msg; dout("reply arrived after request %lld was aborted\n", tid);
ceph_msg_get(msg);
} }
mutex_unlock(&mdsc->mutex);
add_cap_releases(mdsc, req->r_session, -1); add_cap_releases(mdsc, req->r_session, -1);
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
......
...@@ -213,7 +213,7 @@ struct ceph_mds_request { ...@@ -213,7 +213,7 @@ struct ceph_mds_request {
struct completion r_safe_completion; struct completion r_safe_completion;
ceph_mds_request_callback_t r_callback; ceph_mds_request_callback_t r_callback;
struct list_head r_unsafe_item; /* per-session unsafe list item */ struct list_head r_unsafe_item; /* per-session unsafe list item */
bool r_got_unsafe, r_got_safe; bool r_got_unsafe, r_got_safe, r_got_result;
bool r_did_prepopulate; bool r_did_prepopulate;
u32 r_readdir_offset; u32 r_readdir_offset;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment