Commit e788182f authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

libceph: define and use in_msg_pos_next()

Define a new function in_msg_pos_next() to match out_msg_pos_next(),
and use it in place of code at the end of read_partial_message_pages()
and read_partial_message_bio().

Note that the page number is incremented and offset reset under
slightly different conditions from before.  The result is
equivalent, however, as explained below.

Each time an incoming message is going to arrive, we find out how
much room is left--not surpassing the current page--and provide that
as the number of bytes to receive.  So the amount we'll use is the
lesser of:  all that's left of the entire request; and all that's
left in the current page.

If we received exactly how many were requested, we either reached
the end of the request or the end of the page.  In the first case,
we're done, in the second, we move onto the next page in the array.

In all cases but (possibly) on the last page, after adding the
number of bytes received, page_pos == PAGE_SIZE.  On the last page,
it doesn't really matter whether we increment the page number and
reset the page position, because we're done and we won't come back
here again.  The code previously skipped over that last case,
basically.  The new code handles that case the same as the others,
incrementing and resetting.
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent b3d56fab
...@@ -1052,6 +1052,28 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, ...@@ -1052,6 +1052,28 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
#endif #endif
} }
static void in_msg_pos_next(struct ceph_connection *con, size_t len,
size_t received)
{
struct ceph_msg *msg = con->in_msg;
BUG_ON(!msg);
BUG_ON(!received);
con->in_msg_pos.data_pos += received;
con->in_msg_pos.page_pos += received;
if (received < len)
return;
BUG_ON(received != len);
con->in_msg_pos.page_pos = 0;
con->in_msg_pos.page++;
#ifdef CONFIG_BLOCK
if (msg->bio)
iter_bio_next(&msg->bio_iter, &msg->bio_seg);
#endif /* CONFIG_BLOCK */
}
/* /*
* Write as much message data payload as we can. If we finish, queue * Write as much message data payload as we can. If we finish, queue
* up the footer. * up the footer.
...@@ -1789,6 +1811,7 @@ static int read_partial_message_pages(struct ceph_connection *con, ...@@ -1789,6 +1811,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
struct page **pages, struct page **pages,
unsigned int data_len, bool do_datacrc) unsigned int data_len, bool do_datacrc)
{ {
struct page *page;
void *p; void *p;
int ret; int ret;
int left; int left;
...@@ -1797,22 +1820,18 @@ static int read_partial_message_pages(struct ceph_connection *con, ...@@ -1797,22 +1820,18 @@ static int read_partial_message_pages(struct ceph_connection *con,
(int)(PAGE_SIZE - con->in_msg_pos.page_pos)); (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
/* (page) data */ /* (page) data */
BUG_ON(pages == NULL); BUG_ON(pages == NULL);
p = kmap(pages[con->in_msg_pos.page]); page = pages[con->in_msg_pos.page];
ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, p = kmap(page);
left); ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
if (ret > 0 && do_datacrc) if (ret > 0 && do_datacrc)
con->in_data_crc = con->in_data_crc =
crc32c(con->in_data_crc, crc32c(con->in_data_crc,
p + con->in_msg_pos.page_pos, ret); p + con->in_msg_pos.page_pos, ret);
kunmap(pages[con->in_msg_pos.page]); kunmap(page);
if (ret <= 0) if (ret <= 0)
return ret; return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret; in_msg_pos_next(con, left, ret);
if (con->in_msg_pos.page_pos == PAGE_SIZE) {
con->in_msg_pos.page_pos = 0;
con->in_msg_pos.page++;
}
return ret; return ret;
} }
...@@ -1823,32 +1842,30 @@ static int read_partial_message_bio(struct ceph_connection *con, ...@@ -1823,32 +1842,30 @@ static int read_partial_message_bio(struct ceph_connection *con,
{ {
struct ceph_msg *msg = con->in_msg; struct ceph_msg *msg = con->in_msg;
struct bio_vec *bv; struct bio_vec *bv;
struct page *page;
void *p; void *p;
int ret, left; int ret, left;
BUG_ON(!msg); BUG_ON(!msg);
BUG_ON(!msg->bio_iter); BUG_ON(!msg->bio_iter);
bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
left = min((int)(data_len - con->in_msg_pos.data_pos), left = min((int)(data_len - con->in_msg_pos.data_pos),
(int)(bv->bv_len - con->in_msg_pos.page_pos)); (int)(bv->bv_len - con->in_msg_pos.page_pos));
p = kmap(bv->bv_page) + bv->bv_offset; page = bv->bv_page;
p = kmap(page) + bv->bv_offset;
ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
left);
if (ret > 0 && do_datacrc) if (ret > 0 && do_datacrc)
con->in_data_crc = con->in_data_crc =
crc32c(con->in_data_crc, crc32c(con->in_data_crc,
p + con->in_msg_pos.page_pos, ret); p + con->in_msg_pos.page_pos, ret);
kunmap(bv->bv_page); kunmap(page);
if (ret <= 0) if (ret <= 0)
return ret; return ret;
con->in_msg_pos.data_pos += ret;
con->in_msg_pos.page_pos += ret; in_msg_pos_next(con, left, ret);
if (con->in_msg_pos.page_pos == bv->bv_len) {
con->in_msg_pos.page_pos = 0;
iter_bio_next(&msg->bio_iter, &msg->bio_seg);
}
return ret; return ret;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment