Commit 859eb799 authored by Alex Elder's avatar Alex Elder

libceph: encapsulate connection kvec operations

Encapsulate the operation of adding a new chunk of data to the next
open slot in a ceph_connection's out_kvec array.  Also add a "reset"
operation to make subsequent add operations start at the beginning
of the array again.

Use these routines throughout, avoiding duplicate code and ensuring
all calls are handled consistently.
Signed-off-by: default avatarAlex Elder <elder@dreamhost.com>
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent 963be4d7
...@@ -469,14 +469,35 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) ...@@ -469,14 +469,35 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
return ret; return ret;
} }
static void ceph_con_out_kvec_reset(struct ceph_connection *con)
{
con->out_kvec_left = 0;
con->out_kvec_bytes = 0;
con->out_kvec_cur = &con->out_kvec[0];
}
static void ceph_con_out_kvec_add(struct ceph_connection *con,
size_t size, void *data)
{
int index;
index = con->out_kvec_left;
BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
con->out_kvec[index].iov_len = size;
con->out_kvec[index].iov_base = data;
con->out_kvec_left++;
con->out_kvec_bytes += size;
}
/* /*
* Prepare footer for currently outgoing message, and finish things * Prepare footer for currently outgoing message, and finish things
* off. Assumes out_kvec* are already valid.. we just add on to the end. * off. Assumes out_kvec* are already valid.. we just add on to the end.
*/ */
static void prepare_write_message_footer(struct ceph_connection *con, int v) static void prepare_write_message_footer(struct ceph_connection *con)
{ {
struct ceph_msg *m = con->out_msg; struct ceph_msg *m = con->out_msg;
int v = con->out_kvec_left;
dout("prepare_write_message_footer %p\n", con); dout("prepare_write_message_footer %p\n", con);
con->out_kvec_is_msg = true; con->out_kvec_is_msg = true;
...@@ -494,9 +515,8 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v) ...@@ -494,9 +515,8 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v)
static void prepare_write_message(struct ceph_connection *con) static void prepare_write_message(struct ceph_connection *con)
{ {
struct ceph_msg *m; struct ceph_msg *m;
int v = 0;
con->out_kvec_bytes = 0; ceph_con_out_kvec_reset(con);
con->out_kvec_is_msg = true; con->out_kvec_is_msg = true;
con->out_msg_done = false; con->out_msg_done = false;
...@@ -504,16 +524,13 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -504,16 +524,13 @@ static void prepare_write_message(struct ceph_connection *con)
* TCP packet that's a good thing. */ * TCP packet that's a good thing. */
if (con->in_seq > con->in_seq_acked) { if (con->in_seq > con->in_seq_acked) {
con->in_seq_acked = con->in_seq; con->in_seq_acked = con->in_seq;
con->out_kvec[v].iov_base = &tag_ack; ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
con->out_kvec[v++].iov_len = 1;
con->out_temp_ack = cpu_to_le64(con->in_seq_acked); con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
con->out_kvec[v].iov_base = &con->out_temp_ack; ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack); &con->out_temp_ack);
con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
} }
m = list_first_entry(&con->out_queue, m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
struct ceph_msg, list_head);
con->out_msg = m; con->out_msg = m;
/* put message on sent list */ /* put message on sent list */
...@@ -537,17 +554,13 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -537,17 +554,13 @@ static void prepare_write_message(struct ceph_connection *con)
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
/* tag + hdr + front + middle */ /* tag + hdr + front + middle */
con->out_kvec[v].iov_base = &tag_msg; ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
con->out_kvec[v++].iov_len = 1; ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
con->out_kvec[v].iov_base = &m->hdr; ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
con->out_kvec[v++].iov_len = sizeof(m->hdr);
con->out_kvec[v++] = m->front;
if (m->middle) if (m->middle)
con->out_kvec[v++] = m->middle->vec; ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
con->out_kvec_left = v; m->middle->vec.iov_base);
con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
(m->middle ? m->middle->vec.iov_len : 0);
con->out_kvec_cur = con->out_kvec;
/* fill in crc (except data pages), footer */ /* fill in crc (except data pages), footer */
con->out_msg->hdr.crc = con->out_msg->hdr.crc =
...@@ -580,7 +593,7 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -580,7 +593,7 @@ static void prepare_write_message(struct ceph_connection *con)
con->out_more = 1; /* data + footer will follow */ con->out_more = 1; /* data + footer will follow */
} else { } else {
/* no, queue up footer too and be done */ /* no, queue up footer too and be done */
prepare_write_message_footer(con, v); prepare_write_message_footer(con);
} }
set_bit(WRITE_PENDING, &con->state); set_bit(WRITE_PENDING, &con->state);
...@@ -595,14 +608,14 @@ static void prepare_write_ack(struct ceph_connection *con) ...@@ -595,14 +608,14 @@ static void prepare_write_ack(struct ceph_connection *con)
con->in_seq_acked, con->in_seq); con->in_seq_acked, con->in_seq);
con->in_seq_acked = con->in_seq; con->in_seq_acked = con->in_seq;
con->out_kvec[0].iov_base = &tag_ack; ceph_con_out_kvec_reset(con);
con->out_kvec[0].iov_len = 1;
ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
con->out_temp_ack = cpu_to_le64(con->in_seq_acked); con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
con->out_kvec[1].iov_base = &con->out_temp_ack; ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
con->out_kvec[1].iov_len = sizeof(con->out_temp_ack); &con->out_temp_ack);
con->out_kvec_left = 2;
con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
con->out_kvec_cur = con->out_kvec;
con->out_more = 1; /* more will follow.. eventually.. */ con->out_more = 1; /* more will follow.. eventually.. */
set_bit(WRITE_PENDING, &con->state); set_bit(WRITE_PENDING, &con->state);
} }
...@@ -613,11 +626,8 @@ static void prepare_write_ack(struct ceph_connection *con) ...@@ -613,11 +626,8 @@ static void prepare_write_ack(struct ceph_connection *con)
static void prepare_write_keepalive(struct ceph_connection *con) static void prepare_write_keepalive(struct ceph_connection *con)
{ {
dout("prepare_write_keepalive %p\n", con); dout("prepare_write_keepalive %p\n", con);
con->out_kvec[0].iov_base = &tag_keepalive; ceph_con_out_kvec_reset(con);
con->out_kvec[0].iov_len = 1; ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
con->out_kvec_left = 1;
con->out_kvec_bytes = 1;
con->out_kvec_cur = con->out_kvec;
set_bit(WRITE_PENDING, &con->state); set_bit(WRITE_PENDING, &con->state);
} }
...@@ -646,12 +656,9 @@ static int prepare_connect_authorizer(struct ceph_connection *con) ...@@ -646,12 +656,9 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
con->out_connect.authorizer_len = cpu_to_le32(auth_len); con->out_connect.authorizer_len = cpu_to_le32(auth_len);
if (auth_len) { if (auth_len)
con->out_kvec[con->out_kvec_left].iov_base = auth_buf; ceph_con_out_kvec_add(con, auth_len, auth_buf);
con->out_kvec[con->out_kvec_left].iov_len = auth_len;
con->out_kvec_left++;
con->out_kvec_bytes += auth_len;
}
return 0; return 0;
} }
...@@ -661,15 +668,11 @@ static int prepare_connect_authorizer(struct ceph_connection *con) ...@@ -661,15 +668,11 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
static void prepare_write_banner(struct ceph_messenger *msgr, static void prepare_write_banner(struct ceph_messenger *msgr,
struct ceph_connection *con) struct ceph_connection *con)
{ {
int len = strlen(CEPH_BANNER); ceph_con_out_kvec_reset(con);
ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
&msgr->my_enc_addr);
con->out_kvec[0].iov_base = CEPH_BANNER;
con->out_kvec[0].iov_len = len;
con->out_kvec[1].iov_base = &msgr->my_enc_addr;
con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
con->out_kvec_left = 2;
con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0; con->out_more = 0;
set_bit(WRITE_PENDING, &con->state); set_bit(WRITE_PENDING, &con->state);
} }
...@@ -707,22 +710,16 @@ static int prepare_write_connect(struct ceph_messenger *msgr, ...@@ -707,22 +710,16 @@ static int prepare_write_connect(struct ceph_messenger *msgr,
if (include_banner) if (include_banner)
prepare_write_banner(msgr, con); prepare_write_banner(msgr, con);
else { else
con->out_kvec_left = 0; ceph_con_out_kvec_reset(con);
con->out_kvec_bytes = 0; ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
}
con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
con->out_kvec_left++;
con->out_kvec_bytes += sizeof(con->out_connect);
con->out_kvec_cur = con->out_kvec;
con->out_more = 0; con->out_more = 0;
set_bit(WRITE_PENDING, &con->state); set_bit(WRITE_PENDING, &con->state);
return prepare_connect_authorizer(con); return prepare_connect_authorizer(con);
} }
/* /*
* write as much of pending kvecs to the socket as we can. * write as much of pending kvecs to the socket as we can.
* 1 -> done * 1 -> done
...@@ -919,10 +916,8 @@ static int write_partial_msg_pages(struct ceph_connection *con) ...@@ -919,10 +916,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
/* prepare and queue up footer, too */ /* prepare and queue up footer, too */
if (!crc) if (!crc)
con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
con->out_kvec_bytes = 0; ceph_con_out_kvec_reset(con);
con->out_kvec_left = 0; prepare_write_message_footer(con);
con->out_kvec_cur = con->out_kvec;
prepare_write_message_footer(con, 0);
ret = 1; ret = 1;
out: out:
return ret; return ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment