Commit fe5da05e authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: redo callbacks and factor out MOSDOpReply decoding

If you specify ACK | ONDISK and set ->r_unsafe_callback, both
->r_callback and ->r_unsafe_callback(true) are called on ack.  This is
very confusing.  Redo this so that only one of them is called:

    ->r_unsafe_callback(true), on ack
    ->r_unsafe_callback(false), on commit

or

    ->r_callback, on ack|commit

Decode everything in decode_MOSDOpReply() to reduce clutter.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 85e084fe
...@@ -1765,8 +1765,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) ...@@ -1765,8 +1765,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
goto out_unlock; goto out_unlock;
} }
wr_req->r_flags = CEPH_OSD_FLAG_WRITE | wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ACK;
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
......
...@@ -770,6 +770,8 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) ...@@ -770,6 +770,8 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
list_add_tail(&req->r_unsafe_item, list_add_tail(&req->r_unsafe_item,
&ci->i_unsafe_writes); &ci->i_unsafe_writes);
spin_unlock(&ci->i_unsafe_lock); spin_unlock(&ci->i_unsafe_lock);
complete_all(&req->r_completion);
} else { } else {
spin_lock(&ci->i_unsafe_lock); spin_lock(&ci->i_unsafe_lock);
list_del_init(&req->r_unsafe_item); list_del_init(&req->r_unsafe_item);
......
...@@ -162,13 +162,14 @@ struct ceph_osd_request { ...@@ -162,13 +162,14 @@ struct ceph_osd_request {
unsigned int r_num_ops; unsigned int r_num_ops;
int r_result; int r_result;
int r_got_reply; bool r_got_reply;
int r_linger; int r_linger;
struct ceph_osd_client *r_osdc; struct ceph_osd_client *r_osdc;
struct kref r_kref; struct kref r_kref;
bool r_mempool; bool r_mempool;
struct completion r_completion, r_safe_completion; struct completion r_completion;
struct completion r_safe_completion; /* fsync waiter */
ceph_osdc_callback_t r_callback; ceph_osdc_callback_t r_callback;
ceph_osdc_unsafe_callback_t r_unsafe_callback; ceph_osdc_unsafe_callback_t r_unsafe_callback;
struct list_head r_unsafe_item; struct list_head r_unsafe_item;
......
...@@ -1693,6 +1693,14 @@ static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, ...@@ -1693,6 +1693,14 @@ static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
return 0; return 0;
} }
static void __complete_request(struct ceph_osd_request *req)
{
if (req->r_callback)
req->r_callback(req);
else
complete_all(&req->r_completion);
}
/* /*
* Timeout callback, called every N seconds when 1 or more osd * Timeout callback, called every N seconds when 1 or more osd
* requests has been active for more than N seconds. When this * requests has been active for more than N seconds. When this
...@@ -1875,107 +1883,76 @@ static int ceph_redirect_decode(void **p, void *end, ...@@ -1875,107 +1883,76 @@ static int ceph_redirect_decode(void **p, void *end,
goto out; goto out;
} }
static void complete_request(struct ceph_osd_request *req) struct MOSDOpReply {
{ struct ceph_pg pgid;
complete_all(&req->r_safe_completion); /* fsync waiter */ u64 flags;
} int result;
u32 epoch;
int num_ops;
u32 outdata_len[CEPH_OSD_MAX_OPS];
s32 rval[CEPH_OSD_MAX_OPS];
int retry_attempt;
struct ceph_eversion replay_version;
u64 user_version;
struct ceph_request_redirect redirect;
};
/* static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
*/
static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
{ {
void *p, *end; void *p = msg->front.iov_base;
struct ceph_osd_request *req; void *const end = p + msg->front.iov_len;
struct ceph_request_redirect redir; u16 version = le16_to_cpu(msg->hdr.version);
u64 tid; struct ceph_eversion bad_replay_version;
int object_len;
unsigned int numops;
int payload_len, flags;
s32 result;
s32 retry_attempt;
struct ceph_pg pg;
int err;
u32 reassert_epoch;
u64 reassert_version;
u32 osdmap_epoch;
int already_completed;
u32 bytes;
u8 decode_redir; u8 decode_redir;
unsigned int i; u32 len;
int ret;
tid = le64_to_cpu(msg->hdr.tid); int i;
dout("handle_reply %p tid %llu\n", msg, tid);
p = msg->front.iov_base;
end = p + msg->front.iov_len;
ceph_decode_need(&p, end, 4, bad); ceph_decode_32_safe(&p, end, len, e_inval);
object_len = ceph_decode_32(&p); ceph_decode_need(&p, end, len, e_inval);
ceph_decode_need(&p, end, object_len, bad); p += len; /* skip oid */
p += object_len;
err = ceph_decode_pgid(&p, end, &pg); ret = ceph_decode_pgid(&p, end, &m->pgid);
if (err) if (ret)
goto bad; return ret;
ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); ceph_decode_64_safe(&p, end, m->flags, e_inval);
flags = ceph_decode_64(&p); ceph_decode_32_safe(&p, end, m->result, e_inval);
result = ceph_decode_32(&p); ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
reassert_epoch = ceph_decode_32(&p); memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
reassert_version = ceph_decode_64(&p); p += sizeof(bad_replay_version);
osdmap_epoch = ceph_decode_32(&p); ceph_decode_32_safe(&p, end, m->epoch, e_inval);
/* lookup */ ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
down_read(&osdc->map_sem); if (m->num_ops > ARRAY_SIZE(m->outdata_len))
mutex_lock(&osdc->request_mutex); goto e_inval;
req = lookup_request(&osdc->requests, tid);
if (req == NULL) {
dout("handle_reply tid %llu dne\n", tid);
goto bad_mutex;
}
ceph_osdc_get_request(req);
dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
req, result); e_inval);
for (i = 0; i < m->num_ops; i++) {
ceph_decode_need(&p, end, 4, bad_put);
numops = ceph_decode_32(&p);
if (numops > CEPH_OSD_MAX_OPS)
goto bad_put;
if (numops != req->r_num_ops)
goto bad_put;
payload_len = 0;
ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
for (i = 0; i < numops; i++) {
struct ceph_osd_op *op = p; struct ceph_osd_op *op = p;
int len;
len = le32_to_cpu(op->payload_len); m->outdata_len[i] = le32_to_cpu(op->payload_len);
req->r_ops[i].outdata_len = len;
dout(" op %d has %d bytes\n", i, len);
payload_len += len;
p += sizeof(*op); p += sizeof(*op);
} }
bytes = le32_to_cpu(msg->hdr.data_len);
if (payload_len != bytes) {
pr_warn("sum of op payload lens %d != data_len %d\n",
payload_len, bytes);
goto bad_put;
}
ceph_decode_need(&p, end, 4 + numops * 4, bad_put); ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
retry_attempt = ceph_decode_32(&p); for (i = 0; i < m->num_ops; i++)
for (i = 0; i < numops; i++) ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
req->r_ops[i].rval = ceph_decode_32(&p);
if (le16_to_cpu(msg->hdr.version) >= 6) { if (version >= 5) {
p += 8 + 4; /* skip replay_version */ ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
p += 8; /* skip user_version */ memcpy(&m->replay_version, p, sizeof(m->replay_version));
p += sizeof(m->replay_version);
ceph_decode_64_safe(&p, end, m->user_version, e_inval);
} else {
m->replay_version = bad_replay_version; /* struct */
m->user_version = le64_to_cpu(m->replay_version.version);
}
if (le16_to_cpu(msg->hdr.version) >= 7) if (version >= 6) {
ceph_decode_8_safe(&p, end, decode_redir, bad_put); if (version >= 7)
ceph_decode_8_safe(&p, end, decode_redir, e_inval);
else else
decode_redir = 1; decode_redir = 1;
} else { } else {
...@@ -1983,19 +1960,96 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -1983,19 +1960,96 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
} }
if (decode_redir) { if (decode_redir) {
err = ceph_redirect_decode(&p, end, &redir); ret = ceph_redirect_decode(&p, end, &m->redirect);
if (err) if (ret)
goto bad_put; return ret;
} else { } else {
redir.oloc.pool = -1; ceph_oloc_init(&m->redirect.oloc);
} }
if (!ceph_oloc_empty(&redir.oloc)) { return 0;
dout("redirect pool %lld\n", redir.oloc.pool);
e_inval:
return -EINVAL;
}
/*
* We are done with @req if
* - @m is a safe reply, or
* - @m is an unsafe reply and we didn't want a safe one
*/
static bool done_request(const struct ceph_osd_request *req,
const struct MOSDOpReply *m)
{
return (m->result < 0 ||
(m->flags & CEPH_OSD_FLAG_ONDISK) ||
!(req->r_flags & CEPH_OSD_FLAG_ONDISK));
}
/*
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
*
* ->r_unsafe_callback is set? yes no
*
* first reply is OK (needed r_cb/r_completion, r_cb/r_completion,
* any or needed/got safe) r_safe_completion r_safe_completion
*
* first reply is unsafe r_unsafe_cb(true) (nothing)
*
* when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
* r_safe_completion r_safe_completion
*/
static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
{
struct ceph_osd_request *req;
struct MOSDOpReply m;
u64 tid = le64_to_cpu(msg->hdr.tid);
u32 data_len = 0;
bool already_acked;
int ret;
int i;
dout("%s msg %p tid %llu\n", __func__, msg, tid);
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
req = lookup_request(&osdc->requests, tid);
if (!req) {
dout("%s no tid %llu\n", __func__, tid);
goto out_unlock;
}
ceph_osdc_get_request(req);
ret = decode_MOSDOpReply(msg, &m);
if (ret) {
pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
req->r_tid, ret);
ceph_msg_dump(msg);
goto fail_request;
}
dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
__func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
le64_to_cpu(m.replay_version.version), m.user_version);
if (m.retry_attempt >= 0) {
if (m.retry_attempt != req->r_attempts - 1) {
dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
req, req->r_tid, m.retry_attempt,
req->r_attempts - 1);
goto out_put;
}
} else {
WARN_ON(1); /* MOSDOpReply v4 is assumed */
}
if (!ceph_oloc_empty(&m.redirect.oloc)) {
dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
m.redirect.oloc.pool);
__unregister_request(osdc, req); __unregister_request(osdc, req);
ceph_oloc_copy(&req->r_t.target_oloc, &redir.oloc); ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
/* /*
* Start redirect requests with nofail=true. If * Start redirect requests with nofail=true. If
...@@ -2005,85 +2059,85 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) ...@@ -2005,85 +2059,85 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
* successfully. In the future we might want to follow * successfully. In the future we might want to follow
* original request's nofail setting here. * original request's nofail setting here.
*/ */
err = __ceph_osdc_start_request(osdc, req, true); ret = __ceph_osdc_start_request(osdc, req, true);
BUG_ON(err); BUG_ON(ret);
goto out_unlock; goto out_put;
} }
already_completed = req->r_got_reply; if (m.num_ops != req->r_num_ops) {
if (!req->r_got_reply) { pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
req->r_result = result; req->r_num_ops, req->r_tid);
dout("handle_reply result %d bytes %d\n", req->r_result, goto fail_request;
bytes);
if (req->r_result == 0)
req->r_result = bytes;
/* in case this is a write and we need to replay, */
req->r_replay_version.epoch = cpu_to_le32(reassert_epoch);
req->r_replay_version.version = cpu_to_le64(reassert_version);
req->r_got_reply = 1;
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
dout("handle_reply tid %llu dup ack\n", tid);
goto out_unlock;
} }
for (i = 0; i < req->r_num_ops; i++) {
dout("handle_reply tid %llu flags %d\n", tid, flags); dout(" req %p tid %llu op %d rval %d len %u\n", req,
req->r_tid, i, m.rval[i], m.outdata_len[i]);
if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) req->r_ops[i].rval = m.rval[i];
__register_linger_request(osdc, req); req->r_ops[i].outdata_len = m.outdata_len[i];
data_len += m.outdata_len[i];
/* either this is a read, or we got the safe response */ }
if (result < 0 || if (data_len != le32_to_cpu(msg->hdr.data_len)) {
(flags & CEPH_OSD_FLAG_ONDISK) || pr_err("sum of lens %u != %u for tid %llu\n", data_len,
((flags & CEPH_OSD_FLAG_WRITE) == 0)) le32_to_cpu(msg->hdr.data_len), req->r_tid);
goto fail_request;
}
dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__,
req, req->r_tid, req->r_got_reply, m.result, data_len);
already_acked = req->r_got_reply;
if (!already_acked) {
req->r_result = m.result ?: data_len;
req->r_replay_version = m.replay_version; /* struct */
req->r_got_reply = true;
} else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
dout("req %p tid %llu dup ack\n", req, req->r_tid);
goto out_put;
}
if (done_request(req, &m)) {
__unregister_request(osdc, req); __unregister_request(osdc, req);
if (req->r_linger) {
WARN_ON(req->r_unsafe_callback);
__register_linger_request(osdc, req);
}
}
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem); up_read(&osdc->map_sem);
if (!already_completed) { if (done_request(req, &m)) {
if (req->r_unsafe_callback && if (already_acked && req->r_unsafe_callback) {
result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) dout("req %p tid %llu safe-cb\n", req, req->r_tid);
req->r_unsafe_callback(req, true);
if (req->r_callback)
req->r_callback(req);
else
complete_all(&req->r_completion);
}
if (flags & CEPH_OSD_FLAG_ONDISK) {
if (req->r_unsafe_callback && already_completed)
req->r_unsafe_callback(req, false); req->r_unsafe_callback(req, false);
complete_request(req); } else {
dout("req %p tid %llu cb\n", req, req->r_tid);
__complete_request(req);
}
} else {
if (req->r_unsafe_callback) {
dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
req->r_unsafe_callback(req, true);
} else {
WARN_ON(1);
}
} }
if (m.flags & CEPH_OSD_FLAG_ONDISK)
complete_all(&req->r_safe_completion);
out:
dout("req=%p req->r_linger=%d\n", req, req->r_linger);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
return; return;
out_unlock:
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
goto out;
bad_put: fail_request:
req->r_result = -EIO; req->r_result = -EIO;
__unregister_request(osdc, req); __unregister_request(osdc, req);
if (req->r_callback) __complete_request(req);
req->r_callback(req); complete_all(&req->r_safe_completion);
else out_put:
complete_all(&req->r_completion);
complete_request(req);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
bad_mutex: out_unlock:
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem); up_read(&osdc->map_sem);
bad:
pr_err("corrupt osd_op_reply got %d %d\n",
(int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
ceph_msg_dump(msg);
} }
static void reset_changed_osds(struct ceph_osd_client *osdc) static void reset_changed_osds(struct ceph_osd_client *osdc)
...@@ -2591,7 +2645,9 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc, ...@@ -2591,7 +2645,9 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
if (rc < 0) { if (rc < 0) {
dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
ceph_osdc_cancel_request(req); ceph_osdc_cancel_request(req);
complete_request(req);
/* kludge - need to to wake ceph_osdc_sync() */
complete_all(&req->r_safe_completion);
return rc; return rc;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment