Commit f5db90bc authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

libceph: kill last of ceph_msg_pos

The only remaining field in the ceph_msg_pos structure is
did_page_crc.  In the new cursor model of things that flag (or
something like it) belongs in the cursor.

Define a new field "need_crc" in the cursor (which applies to all
types of data) and initialize it to true whenever a cursor is
initialized.

In write_partial_message_data(), the data CRC still will be computed
as before, but it will check the cursor->need_crc field to determine
whether it's needed.  Any time the cursor is advanced to a new piece
of a data item, need_crc will be set, and this will cause the crc
for that entire piece to be accumulated into the data crc.

In write_partial_message_data() the intermediate crc value is now
held in a local variable so it doesn't have to be byte-swapped so
many times.  In read_partial_msg_data() we do something similar
(but mainly for consistency there).

With that, the ceph_msg_pos structure can go away,  and it no longer
needs to be passed as an argument to prepare_message_data().

This cleanup is related to:
    http://tracker.ceph.com/issues/4428Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent 859a35d5
...@@ -93,6 +93,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) ...@@ -93,6 +93,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
struct ceph_msg_data_cursor { struct ceph_msg_data_cursor {
size_t resid; /* bytes not yet consumed */ size_t resid; /* bytes not yet consumed */
bool last_piece; /* now at last piece of data item */ bool last_piece; /* now at last piece of data item */
bool need_crc; /* new piece; crc update needed */
union { union {
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
struct { /* bio */ struct { /* bio */
...@@ -156,10 +157,6 @@ struct ceph_msg { ...@@ -156,10 +157,6 @@ struct ceph_msg {
struct ceph_msgpool *pool; struct ceph_msgpool *pool;
}; };
struct ceph_msg_pos {
bool did_page_crc; /* true if we've calculated crc for current page */
};
/* ceph connection fault delay defaults, for exponential backoff */ /* ceph connection fault delay defaults, for exponential backoff */
#define BASE_DELAY_INTERVAL (HZ/2) #define BASE_DELAY_INTERVAL (HZ/2)
#define MAX_DELAY_INTERVAL (5 * 60 * HZ) #define MAX_DELAY_INTERVAL (5 * 60 * HZ)
...@@ -217,7 +214,6 @@ struct ceph_connection { ...@@ -217,7 +214,6 @@ struct ceph_connection {
struct ceph_msg *out_msg; /* sending message (== tail of struct ceph_msg *out_msg; /* sending message (== tail of
out_sent) */ out_sent) */
bool out_msg_done; bool out_msg_done;
struct ceph_msg_pos out_msg_pos;
struct kvec out_kvec[8], /* sending header/footer data */ struct kvec out_kvec[8], /* sending header/footer data */
*out_kvec_cur; *out_kvec_cur;
...@@ -231,7 +227,6 @@ struct ceph_connection { ...@@ -231,7 +227,6 @@ struct ceph_connection {
/* message in temps */ /* message in temps */
struct ceph_msg_header in_hdr; struct ceph_msg_header in_hdr;
struct ceph_msg *in_msg; struct ceph_msg *in_msg;
struct ceph_msg_pos in_msg_pos;
u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */ u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */
char in_tag; /* protocol control byte */ char in_tag; /* protocol control byte */
......
...@@ -1002,6 +1002,7 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data, ...@@ -1002,6 +1002,7 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
/* BUG(); */ /* BUG(); */
break; break;
} }
data->cursor.need_crc = true;
} }
/* /*
...@@ -1069,12 +1070,12 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes) ...@@ -1069,12 +1070,12 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
BUG(); BUG();
break; break;
} }
data->cursor.need_crc = new_piece;
return new_piece; return new_piece;
} }
static void prepare_message_data(struct ceph_msg *msg, static void prepare_message_data(struct ceph_msg *msg)
struct ceph_msg_pos *msg_pos)
{ {
size_t data_len; size_t data_len;
...@@ -1086,8 +1087,6 @@ static void prepare_message_data(struct ceph_msg *msg, ...@@ -1086,8 +1087,6 @@ static void prepare_message_data(struct ceph_msg *msg,
/* Initialize data cursor */ /* Initialize data cursor */
ceph_msg_data_cursor_init(&msg->data, data_len); ceph_msg_data_cursor_init(&msg->data, data_len);
msg_pos->did_page_crc = false;
} }
/* /*
...@@ -1186,7 +1185,7 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -1186,7 +1185,7 @@ static void prepare_write_message(struct ceph_connection *con)
/* is there a data payload? */ /* is there a data payload? */
con->out_msg->footer.data_crc = 0; con->out_msg->footer.data_crc = 0;
if (m->hdr.data_len) { if (m->hdr.data_len) {
prepare_message_data(con->out_msg, &con->out_msg_pos); prepare_message_data(con->out_msg);
con->out_more = 1; /* data + footer will follow */ con->out_more = 1; /* data + footer will follow */
} else { } else {
/* no, queue up footer too and be done */ /* no, queue up footer too and be done */
...@@ -1388,8 +1387,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, ...@@ -1388,8 +1387,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
size_t len, size_t sent) size_t len, size_t sent)
{ {
struct ceph_msg *msg = con->out_msg; struct ceph_msg *msg = con->out_msg;
struct ceph_msg_pos *msg_pos = &con->out_msg_pos; bool need_crc;
bool need_crc = false;
BUG_ON(!msg); BUG_ON(!msg);
BUG_ON(!sent); BUG_ON(!sent);
...@@ -1401,7 +1399,6 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, ...@@ -1401,7 +1399,6 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
return; return;
BUG_ON(sent != len); BUG_ON(sent != len);
msg_pos->did_page_crc = false;
} }
static void in_msg_pos_next(struct ceph_connection *con, size_t len, static void in_msg_pos_next(struct ceph_connection *con, size_t len,
...@@ -1444,9 +1441,8 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1444,9 +1441,8 @@ static int write_partial_message_data(struct ceph_connection *con)
{ {
struct ceph_msg *msg = con->out_msg; struct ceph_msg *msg = con->out_msg;
struct ceph_msg_data_cursor *cursor = &msg->data.cursor; struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
bool do_datacrc = !con->msgr->nocrc; bool do_datacrc = !con->msgr->nocrc;
int ret; u32 crc;
dout("%s %p msg %p\n", __func__, con, msg); dout("%s %p msg %p\n", __func__, con, msg);
...@@ -1461,38 +1457,40 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1461,38 +1457,40 @@ static int write_partial_message_data(struct ceph_connection *con)
* need to map the page. If we have no pages, they have * need to map the page. If we have no pages, they have
* been revoked, so use the zero page. * been revoked, so use the zero page.
*/ */
crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
while (cursor->resid) { while (cursor->resid) {
struct page *page; struct page *page;
size_t page_offset; size_t page_offset;
size_t length; size_t length;
bool last_piece; bool last_piece;
int ret;
page = ceph_msg_data_next(&msg->data, &page_offset, &length, page = ceph_msg_data_next(&msg->data, &page_offset, &length,
&last_piece); &last_piece);
if (do_datacrc && !msg_pos->did_page_crc) { if (do_datacrc && cursor->need_crc)
u32 crc = le32_to_cpu(msg->footer.data_crc);
crc = ceph_crc32c_page(crc, page, page_offset, length); crc = ceph_crc32c_page(crc, page, page_offset, length);
msg->footer.data_crc = cpu_to_le32(crc);
msg_pos->did_page_crc = true;
}
ret = ceph_tcp_sendpage(con->sock, page, page_offset, ret = ceph_tcp_sendpage(con->sock, page, page_offset,
length, last_piece); length, last_piece);
if (ret <= 0) if (ret <= 0) {
goto out; if (do_datacrc)
msg->footer.data_crc = cpu_to_le32(crc);
return ret;
}
out_msg_pos_next(con, page, length, (size_t) ret); out_msg_pos_next(con, page, length, (size_t) ret);
} }
dout("%s %p msg %p done\n", __func__, con, msg); dout("%s %p msg %p done\n", __func__, con, msg);
/* prepare and queue up footer, too */ /* prepare and queue up footer, too */
if (!do_datacrc) if (do_datacrc)
msg->footer.data_crc = cpu_to_le32(crc);
else
msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
con_out_kvec_reset(con); con_out_kvec_reset(con);
prepare_write_message_footer(con); prepare_write_message_footer(con);
ret = 1;
out: return 1; /* must return > 0 to indicate success */
return ret;
} }
/* /*
...@@ -2144,24 +2142,32 @@ static int read_partial_msg_data(struct ceph_connection *con) ...@@ -2144,24 +2142,32 @@ static int read_partial_msg_data(struct ceph_connection *con)
struct page *page; struct page *page;
size_t page_offset; size_t page_offset;
size_t length; size_t length;
u32 crc = 0;
int ret; int ret;
BUG_ON(!msg); BUG_ON(!msg);
if (WARN_ON(!ceph_msg_has_data(msg))) if (WARN_ON(!ceph_msg_has_data(msg)))
return -EIO; return -EIO;
if (do_datacrc)
crc = con->in_data_crc;
while (cursor->resid) { while (cursor->resid) {
page = ceph_msg_data_next(&msg->data, &page_offset, &length, page = ceph_msg_data_next(&msg->data, &page_offset, &length,
NULL); NULL);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0) if (ret <= 0) {
if (do_datacrc)
con->in_data_crc = crc;
return ret; return ret;
}
if (do_datacrc) if (do_datacrc)
con->in_data_crc = ceph_crc32c_page(con->in_data_crc, crc = ceph_crc32c_page(crc, page, page_offset, ret);
page, page_offset, ret);
in_msg_pos_next(con, length, ret); in_msg_pos_next(con, length, ret);
} }
if (do_datacrc)
con->in_data_crc = crc;
return 1; /* must return > 0 to indicate success */ return 1; /* must return > 0 to indicate success */
} }
...@@ -2257,7 +2263,7 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -2257,7 +2263,7 @@ static int read_partial_message(struct ceph_connection *con)
/* prepare for data payload, if any */ /* prepare for data payload, if any */
if (data_len) if (data_len)
prepare_message_data(con->in_msg, &con->in_msg_pos); prepare_message_data(con->in_msg);
} }
/* front */ /* front */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment