Commit bb873b53 authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: switch to calc_target(), part 2

The crux of this is getting rid of ceph_osdc_build_request(), so that
MOSDOp can be encoded not before but after calc_target() calculates the
actual target.  Encoding now happens within ceph_osdc_start_request().

Also nuked is the accompanying bunch of pointers into the encoded
buffer that was used to update fields on each send - instead, the
entire front is re-encoded.  If we want to support target->name_len !=
base->name_len in the future, there is no other way, because oid is
surrounded by other fields in the encoded buffer.

Encoding OSD ops and adding data items to the request message were
mixed together in osd_req_encode_op().  While we want to re-encode OSD
ops, we don't want to add duplicate data items to the message when
resending, so all call to ceph_osdc_msg_data_add() are factored out
into a new setup_request_data().
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent a66dd383
......@@ -1896,27 +1896,17 @@ static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request = obj_request->img_request;
struct ceph_osd_request *osd_req = obj_request->osd_req;
u64 snap_id;
rbd_assert(osd_req != NULL);
snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
ceph_osdc_build_request(osd_req, obj_request->offset,
NULL, snap_id, NULL);
if (img_request)
osd_req->r_snapid = img_request->snap_id;
}
static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
{
struct rbd_img_request *img_request = obj_request->img_request;
struct ceph_osd_request *osd_req = obj_request->osd_req;
struct ceph_snap_context *snapc;
struct timespec mtime = CURRENT_TIME;
rbd_assert(osd_req != NULL);
snapc = img_request ? img_request->snapc : NULL;
ceph_osdc_build_request(osd_req, obj_request->offset,
snapc, CEPH_NOSNAP, &mtime);
osd_req->r_mtime = CURRENT_TIME;
osd_req->r_data_offset = obj_request->offset;
}
/*
......
......@@ -376,8 +376,6 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
req->r_callback = finish_read;
req->r_inode = inode;
ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
ret = ceph_osdc_start_request(osdc, req, false);
if (ret < 0)
......@@ -1063,10 +1061,7 @@ static int ceph_writepages_start(struct address_space *mapping,
pages = NULL;
}
vino = ceph_vino(inode);
ceph_osdc_build_request(req, offset, snapc, vino.snap,
&inode->i_mtime);
req->r_mtime = inode->i_mtime;
rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
BUG_ON(rc);
req = NULL;
......@@ -1614,7 +1609,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
goto out;
}
ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
req->r_mtime = inode->i_mtime;
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
......@@ -1657,7 +1652,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
goto out_put;
}
ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
req->r_mtime = inode->i_mtime;
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
......@@ -1790,12 +1785,9 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
0, false, true);
ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP,
&ci->vfs_inode.i_mtime);
err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP,
&ci->vfs_inode.i_mtime);
wr_req->r_mtime = ci->vfs_inode.i_mtime;
err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
if (!err)
......
......@@ -727,8 +727,8 @@ static void ceph_aio_retry_work(struct work_struct *work)
req->r_ops[0] = orig_req->r_ops[0];
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
snapc, CEPH_NOSNAP, &aio_req->mtime);
req->r_mtime = aio_req->mtime;
req->r_data_offset = req->r_ops[0].extent.offset;
ceph_osdc_put_request(orig_req);
......@@ -882,14 +882,12 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
(pos+len) | (PAGE_SIZE - 1));
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
req->r_mtime = mtime;
}
osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
false, false);
ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
if (aio_req) {
aio_req->total_len += len;
aio_req->num_reqs++;
......@@ -1074,9 +1072,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
false, true);
/* BUG_ON(vino.snap != CEPH_NOSNAP); */
ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
req->r_mtime = mtime;
ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
......@@ -1532,9 +1528,7 @@ static int ceph_zero_partial_object(struct inode *inode,
goto out;
}
ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap,
&inode->i_mtime);
req->r_mtime = inode->i_mtime;
ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!ret) {
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
......
......@@ -104,7 +104,7 @@ struct ceph_osd_req_op {
struct ceph_osd_data response_data;
__u8 class_len;
__u8 method_len;
__u8 argc;
u32 indata_len;
} cls;
struct {
u64 cookie;
......@@ -162,14 +162,6 @@ struct ceph_osd_request {
/* request osd ops array */
unsigned int r_num_ops;
/* these are updated on each send */
__le32 *r_request_osdmap_epoch;
__le32 *r_request_flags;
__le64 *r_request_pool;
void *r_request_pgid;
__le32 *r_request_attempts;
struct ceph_eversion *r_request_reassert_version;
int r_result;
int r_got_reply;
int r_linger;
......@@ -180,16 +172,22 @@ struct ceph_osd_request {
struct completion r_completion, r_safe_completion;
ceph_osdc_callback_t r_callback;
ceph_osdc_unsafe_callback_t r_unsafe_callback;
struct ceph_eversion r_reassert_version;
struct list_head r_unsafe_item;
struct inode *r_inode; /* for use by callbacks */
void *r_priv; /* ditto */
u64 r_snapid;
unsigned long r_stamp; /* send OR check time */
/* set by submitter */
u64 r_snapid; /* for reads, CEPH_NOSNAP o/w */
struct ceph_snap_context *r_snapc; /* for writes */
struct timespec r_mtime; /* ditto */
u64 r_data_offset; /* ditto */
struct ceph_snap_context *r_snapc; /* snap context for writes */
/* internal */
unsigned long r_stamp; /* jiffies, send or check time */
int r_attempts;
struct ceph_eversion r_replay_version; /* aka reassert_version */
u32 r_last_force_resend;
struct ceph_osd_req_op r_ops[];
};
......@@ -334,11 +332,6 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *
gfp_t gfp_flags);
int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp);
extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
struct ceph_snap_context *snapc,
u64 snap_id,
struct timespec *mtime);
extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
struct ceph_file_layout *layout,
struct ceph_vino vino,
......
......@@ -394,6 +394,13 @@ enum {
CEPH_OSD_FLAG_SKIPRWLOCKS = 0x10000, /* skip rw locks */
CEPH_OSD_FLAG_IGNORE_OVERLAY = 0x20000, /* ignore pool overlay */
CEPH_OSD_FLAG_FLUSH = 0x40000, /* this is part of flush */
CEPH_OSD_FLAG_MAP_SNAP_CLONE = 0x80000, /* map snap direct to clone id */
CEPH_OSD_FLAG_ENFORCE_SNAPC = 0x100000, /* use snapc provided even if
pool uses pool snaps */
CEPH_OSD_FLAG_REDIRECTED = 0x200000, /* op has been redirected */
CEPH_OSD_FLAG_KNOWN_REDIR = 0x400000, /* redirect bit is authoritative */
CEPH_OSD_FLAG_FULL_TRY = 0x800000, /* try op despite full flag */
CEPH_OSD_FLAG_FULL_FORCE = 0x1000000, /* force op despite full flag */
};
enum {
......
......@@ -145,6 +145,43 @@ static int monc_show(struct seq_file *s, void *p)
return 0;
}
static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
{
int i;
seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed);
for (i = 0; i < t->up.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]);
seq_printf(s, "]/%d\t[", t->up.primary);
for (i = 0; i < t->acting.size; i++)
seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary,
t->target_oid.name_len, t->target_oid.name, t->flags);
if (t->paused)
seq_puts(s, "\tP");
}
static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
{
int i;
seq_printf(s, "%llu\t", req->r_tid);
dump_target(s, &req->r_t);
seq_printf(s, "\t%d\t%u'%llu", req->r_attempts,
le32_to_cpu(req->r_replay_version.epoch),
le64_to_cpu(req->r_replay_version.version));
for (i = 0; i < req->r_num_ops; i++) {
struct ceph_osd_req_op *op = &req->r_ops[i];
seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
ceph_osd_op_name(op->op));
}
seq_putc(s, '\n');
}
static int osdc_show(struct seq_file *s, void *pp)
{
struct ceph_client *client = s->private;
......@@ -154,32 +191,10 @@ static int osdc_show(struct seq_file *s, void *pp)
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
struct ceph_osd_request *req;
unsigned int i;
int opcode;
req = rb_entry(p, struct ceph_osd_request, r_node);
seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1,
req->r_t.pgid.pool, req->r_t.pgid.seed);
seq_printf(s, "%*pE", req->r_base_oid.name_len,
req->r_base_oid.name);
if (req->r_reassert_version.epoch)
seq_printf(s, "\t%u'%llu",
(unsigned int)le32_to_cpu(req->r_reassert_version.epoch),
le64_to_cpu(req->r_reassert_version.version));
else
seq_printf(s, "\t");
for (i = 0; i < req->r_num_ops; i++) {
opcode = req->r_ops[i].op;
seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
ceph_osd_op_name(opcode));
}
seq_printf(s, "\n");
dump_request(s, req);
}
mutex_unlock(&osdc->request_mutex);
return 0;
......
......@@ -34,8 +34,6 @@ static void __unregister_request(struct ceph_osd_client *osdc,
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static void __enqueue_request(struct ceph_osd_request *req);
static void __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
/*
* Implement client access to distributed object storage cluster.
......@@ -209,6 +207,8 @@ void osd_req_op_cls_request_data_pagelist(
osd_data = osd_req_op_data(osd_req, which, cls, request_data);
ceph_osd_data_pagelist_init(osd_data, pagelist);
osd_req->r_ops[which].cls.indata_len += pagelist->length;
osd_req->r_ops[which].indata_len += pagelist->length;
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
......@@ -221,6 +221,8 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
osd_data = osd_req_op_data(osd_req, which, cls, request_data);
ceph_osd_data_pages_init(osd_data, pages, length, alignment,
pages_from_pool, own_pages);
osd_req->r_ops[which].cls.indata_len += length;
osd_req->r_ops[which].indata_len += length;
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
......@@ -610,8 +612,6 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
op->cls.argc = 0; /* currently unused */
op->indata_len = payload_len;
}
EXPORT_SYMBOL(osd_req_op_cls_init);
......@@ -709,16 +709,9 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
}
}
static u64 osd_req_encode_op(struct ceph_osd_request *req,
struct ceph_osd_op *dst, unsigned int which)
static u32 osd_req_encode_op(struct ceph_osd_op *dst,
const struct ceph_osd_req_op *src)
{
struct ceph_osd_req_op *src;
struct ceph_osd_data *osd_data;
u64 request_data_len = 0;
u64 data_length;
BUG_ON(which >= req->r_num_ops);
src = &req->r_ops[which];
if (WARN_ON(!osd_req_opcode_valid(src->op))) {
pr_err("unrecognized osd opcode %d\n", src->op);
......@@ -727,49 +720,23 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
switch (src->op) {
case CEPH_OSD_OP_STAT:
osd_data = &src->raw_data_in;
ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_WRITEFULL:
case CEPH_OSD_OP_ZERO:
case CEPH_OSD_OP_TRUNCATE:
if (src->op == CEPH_OSD_OP_WRITE ||
src->op == CEPH_OSD_OP_WRITEFULL)
request_data_len = src->extent.length;
dst->extent.offset = cpu_to_le64(src->extent.offset);
dst->extent.length = cpu_to_le64(src->extent.length);
dst->extent.truncate_size =
cpu_to_le64(src->extent.truncate_size);
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
osd_data = &src->extent.osd_data;
if (src->op == CEPH_OSD_OP_WRITE ||
src->op == CEPH_OSD_OP_WRITEFULL)
ceph_osdc_msg_data_add(req->r_request, osd_data);
else
ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_CALL:
dst->cls.class_len = src->cls.class_len;
dst->cls.method_len = src->cls.method_len;
osd_data = &src->cls.request_info;
ceph_osdc_msg_data_add(req->r_request, osd_data);
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
request_data_len = osd_data->pagelist->length;
osd_data = &src->cls.request_data;
data_length = ceph_osd_data_length(osd_data);
if (data_length) {
BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
dst->cls.indata_len = cpu_to_le32(data_length);
ceph_osdc_msg_data_add(req->r_request, osd_data);
src->indata_len += data_length;
request_data_len += data_length;
}
osd_data = &src->cls.response_data;
ceph_osdc_msg_data_add(req->r_reply, osd_data);
dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
break;
case CEPH_OSD_OP_STARTSYNC:
break;
......@@ -791,9 +758,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
dst->xattr.cmp_op = src->xattr.cmp_op;
dst->xattr.cmp_mode = src->xattr.cmp_mode;
osd_data = &src->xattr.osd_data;
ceph_osdc_msg_data_add(req->r_request, osd_data);
request_data_len = osd_data->pagelist->length;
break;
case CEPH_OSD_OP_CREATE:
case CEPH_OSD_OP_DELETE:
......@@ -810,7 +774,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->flags = cpu_to_le32(src->flags);
dst->payload_len = cpu_to_le32(src->indata_len);
return request_data_len;
return src->indata_len;
}
/*
......@@ -852,8 +816,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
goto fail;
}
req->r_flags = flags;
/* calculate max write size */
r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
if (r)
......@@ -877,9 +839,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
truncate_size, truncate_seq);
}
req->r_flags = flags;
req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
req->r_snapid = vino.snap;
if (flags & CEPH_OSD_FLAG_WRITE)
req->r_data_offset = off;
r = ceph_osdc_alloc_messages(req, GFP_NOFS);
if (r)
goto fail;
......@@ -1509,37 +1476,173 @@ static int __map_request(struct ceph_osd_client *osdc,
return err;
}
/*
* caller should hold map_sem (for read) and request_mutex
*/
static void __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
static void setup_request_data(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
void *p;
u32 data_len = 0;
int i;
if (!list_empty(&msg->data))
return;
dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
req, req->r_tid, req->r_osd->o_osd, req->r_flags,
req->r_t.pgid.pool, req->r_t.pgid.seed);
WARN_ON(msg->data_length);
for (i = 0; i < req->r_num_ops; i++) {
struct ceph_osd_req_op *op = &req->r_ops[i];
switch (op->op) {
/* request */
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_WRITEFULL:
WARN_ON(op->indata_len != op->extent.length);
ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
break;
case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR:
WARN_ON(op->indata_len != op->xattr.name_len +
op->xattr.value_len);
ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
break;
/* reply */
case CEPH_OSD_OP_STAT:
ceph_osdc_msg_data_add(req->r_reply,
&op->raw_data_in);
break;
case CEPH_OSD_OP_READ:
ceph_osdc_msg_data_add(req->r_reply,
&op->extent.osd_data);
break;
/* both */
case CEPH_OSD_OP_CALL:
WARN_ON(op->indata_len != op->cls.class_len +
op->cls.method_len +
op->cls.indata_len);
ceph_osdc_msg_data_add(msg, &op->cls.request_info);
/* optional, can be NONE */
ceph_osdc_msg_data_add(msg, &op->cls.request_data);
/* optional, can be NONE */
ceph_osdc_msg_data_add(req->r_reply,
&op->cls.response_data);
break;
}
data_len += op->indata_len;
}
/* fill in message content that changes each time we send it */
put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
put_unaligned_le32(req->r_flags, req->r_request_flags);
put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool);
p = req->r_request_pgid;
WARN_ON(data_len != msg->data_length);
}
static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
{
void *p = msg->front.iov_base;
void *const end = p + msg->front_alloc_len;
u32 data_len = 0;
int i;
if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
/* snapshots aren't writeable */
WARN_ON(req->r_snapid != CEPH_NOSNAP);
} else {
WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
req->r_data_offset || req->r_snapc);
}
setup_request_data(req, msg);
ceph_encode_32(&p, 1); /* client_inc, always 1 */
ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
ceph_encode_32(&p, req->r_flags);
ceph_encode_timespec(p, &req->r_mtime);
p += sizeof(struct ceph_timespec);
/* aka reassert_version */
memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
p += sizeof(req->r_replay_version);
/* oloc */
ceph_encode_8(&p, 4);
ceph_encode_8(&p, 4);
ceph_encode_32(&p, 8 + 4 + 4);
ceph_encode_64(&p, req->r_t.target_oloc.pool);
ceph_encode_32(&p, -1); /* preferred */
ceph_encode_32(&p, 0); /* key len */
/* pgid */
ceph_encode_8(&p, 1);
ceph_encode_64(&p, req->r_t.pgid.pool);
ceph_encode_32(&p, req->r_t.pgid.seed);
put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
memcpy(req->r_request_reassert_version, &req->r_reassert_version,
sizeof(req->r_reassert_version));
ceph_encode_32(&p, -1); /* preferred */
req->r_stamp = jiffies;
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
/* oid */
ceph_encode_32(&p, req->r_t.target_oid.name_len);
memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
p += req->r_t.target_oid.name_len;
ceph_msg_get(req->r_request); /* send consumes a ref */
/* ops, can imply data */
ceph_encode_16(&p, req->r_num_ops);
for (i = 0; i < req->r_num_ops; i++) {
data_len += osd_req_encode_op(p, &req->r_ops[i]);
p += sizeof(struct ceph_osd_op);
}
req->r_sent = req->r_osd->o_incarnation;
ceph_encode_64(&p, req->r_snapid); /* snapid */
if (req->r_snapc) {
ceph_encode_64(&p, req->r_snapc->seq);
ceph_encode_32(&p, req->r_snapc->num_snaps);
for (i = 0; i < req->r_snapc->num_snaps; i++)
ceph_encode_64(&p, req->r_snapc->snaps[i]);
} else {
ceph_encode_64(&p, 0); /* snap_seq */
ceph_encode_32(&p, 0); /* snaps len */
}
ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
BUG_ON(p > end);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
msg->hdr.data_len = cpu_to_le32(data_len);
/*
* The header "data_off" is a hint to the receiver allowing it
* to align received data into its buffers such that there's no
* need to re-copy it before writing it to disk (direct I/O).
*/
msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
ceph_con_send(&req->r_osd->o_con, req->r_request);
dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__,
req, req->r_t.target_oid.name_len, req->r_t.target_oid.name,
req->r_t.target_oid.name_len, msg->front.iov_len, data_len);
}
/*
* @req has to be assigned a tid and registered.
*/
static void send_request(struct ceph_osd_request *req)
{
struct ceph_osd *osd = req->r_osd;
WARN_ON(osd->o_osd != req->r_t.osd);
req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
if (req->r_attempts)
req->r_flags |= CEPH_OSD_FLAG_RETRY;
else
WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
encode_request(req, req->r_request);
dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
__func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
req->r_t.osd, req->r_flags, req->r_attempts);
req->r_t.paused = false;
req->r_stamp = jiffies;
req->r_attempts++;
req->r_sent = osd->o_incarnation;
req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
}
/*
......@@ -1550,8 +1653,10 @@ static void __send_queued(struct ceph_osd_client *osdc)
struct ceph_osd_request *req, *tmp;
dout("__send_queued\n");
list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
__send_request(osdc, req);
list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
send_request(req);
}
}
/*
......@@ -1915,8 +2020,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
req->r_result = bytes;
/* in case this is a write and we need to replay, */
req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
req->r_reassert_version.version = cpu_to_le64(reassert_version);
req->r_replay_version.epoch = cpu_to_le32(reassert_epoch);
req->r_replay_version.version = cpu_to_le64(reassert_version);
req->r_got_reply = 1;
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
......@@ -2432,105 +2537,6 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
pr_err("osdc handle_watch_notify corrupt msg\n");
}
/*
* build new request AND message
*
*/
void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
struct ceph_snap_context *snapc, u64 snap_id,
struct timespec *mtime)
{
struct ceph_msg *msg = req->r_request;
void *p;
size_t msg_size;
int flags = req->r_flags;
u64 data_len;
unsigned int i;
req->r_snapid = snap_id;
WARN_ON(snapc != req->r_snapc);
/* encode request */
msg->hdr.version = cpu_to_le16(4);
p = msg->front.iov_base;
ceph_encode_32(&p, 1); /* client_inc is always 1 */
req->r_request_osdmap_epoch = p;
p += 4;
req->r_request_flags = p;
p += 4;
if (req->r_flags & CEPH_OSD_FLAG_WRITE)
ceph_encode_timespec(p, mtime);
p += sizeof(struct ceph_timespec);
req->r_request_reassert_version = p;
p += sizeof(struct ceph_eversion); /* will get filled in */
/* oloc */
ceph_encode_8(&p, 4);
ceph_encode_8(&p, 4);
ceph_encode_32(&p, 8 + 4 + 4);
req->r_request_pool = p;
p += 8;
ceph_encode_32(&p, -1); /* preferred */
ceph_encode_32(&p, 0); /* key len */
ceph_encode_8(&p, 1);
req->r_request_pgid = p;
p += 8 + 4;
ceph_encode_32(&p, -1); /* preferred */
/* oid */
ceph_encode_32(&p, req->r_base_oid.name_len);
memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
dout("oid %*pE len %d\n", req->r_base_oid.name_len,
req->r_base_oid.name, req->r_base_oid.name_len);
p += req->r_base_oid.name_len;
/* ops--can imply data */
ceph_encode_16(&p, (u16)req->r_num_ops);
data_len = 0;
for (i = 0; i < req->r_num_ops; i++) {
data_len += osd_req_encode_op(req, p, i);
p += sizeof(struct ceph_osd_op);
}
/* snaps */
ceph_encode_64(&p, req->r_snapid);
ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
if (req->r_snapc) {
for (i = 0; i < req->r_snapc->num_snaps; i++) {
ceph_encode_64(&p, req->r_snapc->snaps[i]);
}
}
req->r_request_attempts = p;
p += 4;
/* data */
if (flags & CEPH_OSD_FLAG_WRITE) {
u16 data_off;
/*
* The header "data_off" is a hint to the receiver
* allowing it to align received data into its
* buffers such that there's no need to re-copy
* it before writing it to disk (direct I/O).
*/
data_off = (u16) (off & 0xffff);
req->r_request->hdr.data_off = cpu_to_le16(data_off);
}
req->r_request->hdr.data_len = cpu_to_le32(data_len);
BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
msg_size = p - msg->front.iov_base;
msg->front.iov_len = msg_size;
msg->hdr.front_len = cpu_to_le32(msg_size);
dout("build_request msg_size was %d\n", (int)msg_size);
}
EXPORT_SYMBOL(ceph_osdc_build_request);
/*
* Register request, send initial attempt.
*/
......@@ -2749,15 +2755,12 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
return PTR_ERR(req);
/* it may be a short read due to an object boundary */
osd_req_op_extent_osd_data_pages(req, 0,
pages, *plen, page_align, false, false);
dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
off, *plen, *plen, page_align);
ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
rc = ceph_osdc_start_request(osdc, req, false);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
......@@ -2783,7 +2786,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
int rc = 0;
int page_align = off & ~PAGE_MASK;
BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
......@@ -2797,8 +2799,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
false, false);
dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
req->r_mtime = *mtime;
rc = ceph_osdc_start_request(osdc, req, true);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment