Commit 38941f80 authored by Alex Elder's avatar Alex Elder Committed by Alex Elder

libceph: have messages point to their connection

When a ceph message is queued for sending it is placed on a list of
pending messages (ceph_connection->out_queue).  When they are
actually sent over the wire, they are moved from that list to
another (ceph_connection->out_sent).  When acknowledgement for the
message is received, it is removed from the sent messages list.

During that entire time the message is "in the possession" of a
single ceph connection.  Keep track of that connection in the
message.  This will be used in the next patch (and is a helpful
bit of information for debugging anyway).
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarSage Weil <sage@inktank.com>
parent 1c20f2d2
...@@ -77,7 +77,10 @@ struct ceph_msg { ...@@ -77,7 +77,10 @@ struct ceph_msg {
unsigned nr_pages; /* size of page array */ unsigned nr_pages; /* size of page array */
unsigned page_alignment; /* io offset in first page */ unsigned page_alignment; /* io offset in first page */
struct ceph_pagelist *pagelist; /* instead of pages */ struct ceph_pagelist *pagelist; /* instead of pages */
struct ceph_connection *con;
struct list_head list_head; struct list_head list_head;
struct kref kref; struct kref kref;
struct bio *bio; /* instead of pages/pagelist */ struct bio *bio; /* instead of pages/pagelist */
struct bio *bio_iter; /* bio iterator */ struct bio *bio_iter; /* bio iterator */
......
...@@ -414,6 +414,9 @@ static int con_close_socket(struct ceph_connection *con) ...@@ -414,6 +414,9 @@ static int con_close_socket(struct ceph_connection *con)
static void ceph_msg_remove(struct ceph_msg *msg) static void ceph_msg_remove(struct ceph_msg *msg)
{ {
list_del_init(&msg->list_head); list_del_init(&msg->list_head);
BUG_ON(msg->con == NULL);
msg->con = NULL;
ceph_msg_put(msg); ceph_msg_put(msg);
} }
static void ceph_msg_remove_list(struct list_head *head) static void ceph_msg_remove_list(struct list_head *head)
...@@ -433,6 +436,8 @@ static void reset_connection(struct ceph_connection *con) ...@@ -433,6 +436,8 @@ static void reset_connection(struct ceph_connection *con)
ceph_msg_remove_list(&con->out_sent); ceph_msg_remove_list(&con->out_sent);
if (con->in_msg) { if (con->in_msg) {
BUG_ON(con->in_msg->con != con);
con->in_msg->con = NULL;
ceph_msg_put(con->in_msg); ceph_msg_put(con->in_msg);
con->in_msg = NULL; con->in_msg = NULL;
} }
...@@ -625,8 +630,10 @@ static void prepare_write_message(struct ceph_connection *con) ...@@ -625,8 +630,10 @@ static void prepare_write_message(struct ceph_connection *con)
&con->out_temp_ack); &con->out_temp_ack);
} }
BUG_ON(list_empty(&con->out_queue));
m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
con->out_msg = m; con->out_msg = m;
BUG_ON(m->con != con);
/* put message on sent list */ /* put message on sent list */
ceph_msg_get(m); ceph_msg_get(m);
...@@ -1806,6 +1813,8 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -1806,6 +1813,8 @@ static int read_partial_message(struct ceph_connection *con)
"error allocating memory for incoming message"; "error allocating memory for incoming message";
return -ENOMEM; return -ENOMEM;
} }
BUG_ON(con->in_msg->con != con);
m = con->in_msg; m = con->in_msg;
m->front.iov_len = 0; /* haven't read it yet */ m->front.iov_len = 0; /* haven't read it yet */
if (m->middle) if (m->middle)
...@@ -1901,6 +1910,8 @@ static void process_message(struct ceph_connection *con) ...@@ -1901,6 +1910,8 @@ static void process_message(struct ceph_connection *con)
{ {
struct ceph_msg *msg; struct ceph_msg *msg;
BUG_ON(con->in_msg->con != con);
con->in_msg->con = NULL;
msg = con->in_msg; msg = con->in_msg;
con->in_msg = NULL; con->in_msg = NULL;
...@@ -2260,6 +2271,8 @@ static void ceph_fault(struct ceph_connection *con) ...@@ -2260,6 +2271,8 @@ static void ceph_fault(struct ceph_connection *con)
con_close_socket(con); con_close_socket(con);
if (con->in_msg) { if (con->in_msg) {
BUG_ON(con->in_msg->con != con);
con->in_msg->con = NULL;
ceph_msg_put(con->in_msg); ceph_msg_put(con->in_msg);
con->in_msg = NULL; con->in_msg = NULL;
} }
...@@ -2378,6 +2391,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -2378,6 +2391,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
/* queue */ /* queue */
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
BUG_ON(msg->con != NULL);
msg->con = con;
BUG_ON(!list_empty(&msg->list_head)); BUG_ON(!list_empty(&msg->list_head));
list_add_tail(&msg->list_head, &con->out_queue); list_add_tail(&msg->list_head, &con->out_queue);
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
...@@ -2403,13 +2418,16 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -2403,13 +2418,16 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
{ {
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
if (!list_empty(&msg->list_head)) { if (!list_empty(&msg->list_head)) {
dout("con_revoke %p msg %p - was on queue\n", con, msg); dout("%s %p msg %p - was on queue\n", __func__, con, msg);
list_del_init(&msg->list_head); list_del_init(&msg->list_head);
BUG_ON(msg->con == NULL);
msg->con = NULL;
ceph_msg_put(msg); ceph_msg_put(msg);
msg->hdr.seq = 0; msg->hdr.seq = 0;
} }
if (con->out_msg == msg) { if (con->out_msg == msg) {
dout("con_revoke %p msg %p - was sending\n", con, msg); dout("%s %p msg %p - was sending\n", __func__, con, msg);
con->out_msg = NULL; con->out_msg = NULL;
if (con->out_kvec_is_msg) { if (con->out_kvec_is_msg) {
con->out_skip = con->out_kvec_bytes; con->out_skip = con->out_kvec_bytes;
...@@ -2478,6 +2496,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, ...@@ -2478,6 +2496,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
if (m == NULL) if (m == NULL)
goto out; goto out;
kref_init(&m->kref); kref_init(&m->kref);
m->con = NULL;
INIT_LIST_HEAD(&m->list_head); INIT_LIST_HEAD(&m->list_head);
m->hdr.tid = 0; m->hdr.tid = 0;
...@@ -2598,6 +2618,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, ...@@ -2598,6 +2618,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
mutex_unlock(&con->mutex); mutex_unlock(&con->mutex);
con->in_msg = con->ops->alloc_msg(con, hdr, &skip); con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
if (con->in_msg)
con->in_msg->con = con;
if (skip) if (skip)
con->in_msg = NULL; con->in_msg = NULL;
...@@ -2611,6 +2633,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con, ...@@ -2611,6 +2633,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
type, front_len); type, front_len);
return false; return false;
} }
con->in_msg->con = con;
con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
} }
memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment