Commit a1930804 authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

libceph: record message data length

Keep track of the length of the data portion for a message in a
separate field in the ceph_msg structure.  This information has
been maintained in wire byte order in the message header, but
that's going to change soon.
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent fdce58cc
...@@ -139,6 +139,7 @@ struct ceph_msg { ...@@ -139,6 +139,7 @@ struct ceph_msg {
struct kvec front; /* unaligned blobs of message */ struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle; struct ceph_buffer *middle;
size_t data_length;
struct ceph_msg_data *data; /* data payload */ struct ceph_msg_data *data; /* data payload */
struct ceph_connection *con; struct ceph_connection *con;
...@@ -270,7 +271,8 @@ extern void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, ...@@ -270,7 +271,8 @@ extern void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment); size_t length, size_t alignment);
extern void ceph_msg_data_set_pagelist(struct ceph_msg *msg, extern void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist); struct ceph_pagelist *pagelist);
extern void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio); extern void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
size_t length);
extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
bool can_fail); bool can_fail);
......
...@@ -2981,6 +2981,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, ...@@ -2981,6 +2981,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
BUG_ON(!pages); BUG_ON(!pages);
BUG_ON(!length); BUG_ON(!length);
BUG_ON(msg->data_length);
BUG_ON(msg->data != NULL); BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
...@@ -2990,6 +2991,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, ...@@ -2990,6 +2991,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
data->alignment = alignment & ~PAGE_MASK; data->alignment = alignment & ~PAGE_MASK;
msg->data = data; msg->data = data;
msg->data_length = length;
} }
EXPORT_SYMBOL(ceph_msg_data_set_pages); EXPORT_SYMBOL(ceph_msg_data_set_pages);
...@@ -3000,6 +3002,7 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg, ...@@ -3000,6 +3002,7 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
BUG_ON(!pagelist); BUG_ON(!pagelist);
BUG_ON(!pagelist->length); BUG_ON(!pagelist->length);
BUG_ON(msg->data_length);
BUG_ON(msg->data != NULL); BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
...@@ -3007,14 +3010,17 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg, ...@@ -3007,14 +3010,17 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
data->pagelist = pagelist; data->pagelist = pagelist;
msg->data = data; msg->data = data;
msg->data_length = pagelist->length;
} }
EXPORT_SYMBOL(ceph_msg_data_set_pagelist); EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio) void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
size_t length)
{ {
struct ceph_msg_data *data; struct ceph_msg_data *data;
BUG_ON(!bio); BUG_ON(!bio);
BUG_ON(msg->data_length);
BUG_ON(msg->data != NULL); BUG_ON(msg->data != NULL);
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
...@@ -3022,6 +3028,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio) ...@@ -3022,6 +3028,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
data->bio = bio; data->bio = bio;
msg->data = data; msg->data = data;
msg->data_length = length;
} }
EXPORT_SYMBOL(ceph_msg_data_set_bio); EXPORT_SYMBOL(ceph_msg_data_set_bio);
...@@ -3200,6 +3207,7 @@ void ceph_msg_last_put(struct kref *kref) ...@@ -3200,6 +3207,7 @@ void ceph_msg_last_put(struct kref *kref)
} }
ceph_msg_data_destroy(m->data); ceph_msg_data_destroy(m->data);
m->data = NULL; m->data = NULL;
m->data_length = 0;
if (m->pool) if (m->pool)
ceph_msgpool_put(m->pool, m); ceph_msgpool_put(m->pool, m);
......
...@@ -1848,7 +1848,7 @@ static void ceph_osdc_msg_data_set(struct ceph_msg *msg, ...@@ -1848,7 +1848,7 @@ static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
ceph_msg_data_set_pagelist(msg, osd_data->pagelist); ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
ceph_msg_data_set_bio(msg, osd_data->bio); ceph_msg_data_set_bio(msg, osd_data->bio, osd_data->bio_length);
#endif #endif
} else { } else {
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment