Commit 13d1ad16 authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: move message allocation out of ceph_osdc_alloc_request()

The size of ->r_request and ->r_reply messages depends on the size of
the object name (ceph_object_id), while the size of ceph_osd_request is
fixed.  Move message allocation into a separate function that would
have to be called after ceph_object_id and ceph_object_locator (which
is also going to become variable in size with RADOS namespaces) have
been filled in:

    req = ceph_osdc_alloc_request(...);
    <fill in req->r_base_oid>
    <fill in req->r_base_oloc>
    ceph_osdc_alloc_messages(req);
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 84127282
...@@ -1954,7 +1954,7 @@ static struct ceph_osd_request *rbd_osd_req_create( ...@@ -1954,7 +1954,7 @@ static struct ceph_osd_request *rbd_osd_req_create(
osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
GFP_NOIO); GFP_NOIO);
if (!osd_req) if (!osd_req)
return NULL; /* ENOMEM */ goto fail;
if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
...@@ -1967,7 +1967,14 @@ static struct ceph_osd_request *rbd_osd_req_create( ...@@ -1967,7 +1967,14 @@ static struct ceph_osd_request *rbd_osd_req_create(
osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
goto fail;
return osd_req; return osd_req;
fail:
ceph_osdc_put_request(osd_req);
return NULL;
} }
/* /*
...@@ -2003,7 +2010,7 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) ...@@ -2003,7 +2010,7 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops, osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
false, GFP_NOIO); false, GFP_NOIO);
if (!osd_req) if (!osd_req)
return NULL; /* ENOMEM */ goto fail;
osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
osd_req->r_callback = rbd_osd_req_callback; osd_req->r_callback = rbd_osd_req_callback;
...@@ -2012,7 +2019,14 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) ...@@ -2012,7 +2019,14 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
goto fail;
return osd_req; return osd_req;
fail:
ceph_osdc_put_request(osd_req);
return NULL;
} }
......
...@@ -1762,6 +1762,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) ...@@ -1762,6 +1762,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
"%llx.00000000", ci->i_vino.ino); "%llx.00000000", ci->i_vino.ino);
rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
if (err)
goto out_unlock;
wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
1, false, GFP_NOFS); 1, false, GFP_NOFS);
if (!wr_req) { if (!wr_req) {
...@@ -1775,6 +1779,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) ...@@ -1775,6 +1779,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
wr_req->r_base_oloc.pool = pool; wr_req->r_base_oloc.pool = pool;
wr_req->r_base_oid = rd_req->r_base_oid; wr_req->r_base_oid = rd_req->r_base_oid;
err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
if (err)
goto out_unlock;
/* one page should be large enough for STAT data */ /* one page should be large enough for STAT data */
pages = ceph_alloc_page_vector(1, GFP_KERNEL); pages = ceph_alloc_page_vector(1, GFP_KERNEL);
if (IS_ERR(pages)) { if (IS_ERR(pages)) {
......
...@@ -717,6 +717,13 @@ static void ceph_aio_retry_work(struct work_struct *work) ...@@ -717,6 +717,13 @@ static void ceph_aio_retry_work(struct work_struct *work)
req->r_base_oloc = orig_req->r_base_oloc; req->r_base_oloc = orig_req->r_base_oloc;
req->r_base_oid = orig_req->r_base_oid; req->r_base_oid = orig_req->r_base_oid;
ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
if (ret) {
ceph_osdc_put_request(req);
req = orig_req;
goto out;
}
req->r_ops[0] = orig_req->r_ops[0]; req->r_ops[0] = orig_req->r_ops[0];
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
......
...@@ -322,6 +322,7 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client * ...@@ -322,6 +322,7 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *
unsigned int num_ops, unsigned int num_ops,
bool use_mempool, bool use_mempool,
gfp_t gfp_flags); gfp_t gfp_flags);
int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp);
extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
struct ceph_snap_context *snapc, struct ceph_snap_context *snapc,
......
...@@ -369,8 +369,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -369,8 +369,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
gfp_t gfp_flags) gfp_t gfp_flags)
{ {
struct ceph_osd_request *req; struct ceph_osd_request *req;
struct ceph_msg *msg;
size_t msg_size;
if (use_mempool) { if (use_mempool) {
BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
...@@ -407,53 +405,59 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, ...@@ -407,53 +405,59 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
req->r_base_oloc.pool = -1; req->r_base_oloc.pool = -1;
req->r_target_oloc.pool = -1; req->r_target_oloc.pool = -1;
msg_size = OSD_OPREPLY_FRONT_LEN; dout("%s req %p\n", __func__, req);
if (num_ops > CEPH_OSD_SLAB_OPS) { return req;
/* ceph_osd_op and rval */ }
msg_size += (num_ops - CEPH_OSD_SLAB_OPS) * EXPORT_SYMBOL(ceph_osdc_alloc_request);
(sizeof(struct ceph_osd_op) + 4);
}
/* create reply message */ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
if (use_mempool) {
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); struct ceph_osd_client *osdc = req->r_osdc;
else struct ceph_msg *msg;
msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, int msg_size;
gfp_flags, true);
if (!msg) {
ceph_osdc_put_request(req);
return NULL;
}
req->r_reply = msg;
/* create request message */
msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
msg_size += 1 + 8 + 4 + 4; /* pgid */ msg_size += 1 + 8 + 4 + 4; /* pgid */
msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ msg_size += 4 + req->r_base_oid.name_len; /* oid */
msg_size += 2 + num_ops * sizeof(struct ceph_osd_op); msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
msg_size += 8; /* snapid */ msg_size += 8; /* snapid */
msg_size += 8; /* snap_seq */ msg_size += 8; /* snap_seq */
msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
msg_size += 4; /* retry_attempt */ msg_size += 4; /* retry_attempt */
/* create request message; allow space for oid */ if (req->r_mempool)
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0); msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
else else
msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
if (!msg) { if (!msg)
ceph_osdc_put_request(req); return -ENOMEM;
return NULL;
}
memset(msg->front.iov_base, 0, msg->front.iov_len); memset(msg->front.iov_base, 0, msg->front.iov_len);
req->r_request = msg; req->r_request = msg;
return req; /* create reply message */
msg_size = OSD_OPREPLY_FRONT_LEN;
if (req->r_num_ops > CEPH_OSD_SLAB_OPS) {
/* ceph_osd_op and rval */
msg_size += (req->r_num_ops - CEPH_OSD_SLAB_OPS) *
(sizeof(struct ceph_osd_op) + 4);
}
if (req->r_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
else
msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
if (!msg)
return -ENOMEM;
req->r_reply = msg;
return 0;
} }
EXPORT_SYMBOL(ceph_osdc_alloc_request); EXPORT_SYMBOL(ceph_osdc_alloc_messages);
static bool osd_req_opcode_valid(u16 opcode) static bool osd_req_opcode_valid(u16 opcode)
{ {
...@@ -828,17 +832,17 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, ...@@ -828,17 +832,17 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
GFP_NOFS); GFP_NOFS);
if (!req) if (!req) {
return ERR_PTR(-ENOMEM); r = -ENOMEM;
goto fail;
}
req->r_flags = flags; req->r_flags = flags;
/* calculate max write size */ /* calculate max write size */
r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
if (r < 0) { if (r)
ceph_osdc_put_request(req); goto fail;
return ERR_PTR(r);
}
if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
osd_req_op_init(req, which, opcode, 0); osd_req_op_init(req, which, opcode, 0);
...@@ -864,7 +868,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, ...@@ -864,7 +868,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
"%llx.%08llx", vino.ino, objnum); "%llx.%08llx", vino.ino, objnum);
req->r_base_oid.name_len = strlen(req->r_base_oid.name); req->r_base_oid.name_len = strlen(req->r_base_oid.name);
r = ceph_osdc_alloc_messages(req, GFP_NOFS);
if (r)
goto fail;
return req; return req;
fail:
ceph_osdc_put_request(req);
return ERR_PTR(r);
} }
EXPORT_SYMBOL(ceph_osdc_new_request); EXPORT_SYMBOL(ceph_osdc_new_request);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment