Commit ea8994cb authored by Sowmini Varadhan's avatar Sowmini Varadhan Committed by David S. Miller

rds: hold a sock ref from rds_message to the rds_sock

The existing model holds a reference from the rds_sock to the
rds_message, but the rds_message does not itself hold a sock_put()
on the rds_sock. Instead the m_rs field in the rds_message is
assigned when the message is queued on the sock, and nulled when
the message is dequeued from the sock.

We want to be able to notify userspace when the rds_message
is actually freed (from rds_message_purge(), after the refcounts
to the rds_message go to 0). At the time that rds_message_purge()
is called, the message is no longer on the rds_sock retransmit
queue. Thus the explicit reference for the m_rs is needed to
send a notification that will signal to userspace that
it is now safe to free/reuse any pages that may have
been pinned down for zerocopy.

This patch manages the m_rs assignment in the rds_message with
the necessary refcount book-keeping.
Signed-off-by: default avatarSowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: default avatarSantosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 6f89dbce
...@@ -58,7 +58,7 @@ EXPORT_SYMBOL_GPL(rds_message_addref); ...@@ -58,7 +58,7 @@ EXPORT_SYMBOL_GPL(rds_message_addref);
*/ */
static void rds_message_purge(struct rds_message *rm) static void rds_message_purge(struct rds_message *rm)
{ {
unsigned long i; unsigned long i, flags;
if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags))) if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
return; return;
...@@ -69,6 +69,12 @@ static void rds_message_purge(struct rds_message *rm) ...@@ -69,6 +69,12 @@ static void rds_message_purge(struct rds_message *rm)
__free_page(sg_page(&rm->data.op_sg[i])); __free_page(sg_page(&rm->data.op_sg[i]));
} }
rm->data.op_nents = 0; rm->data.op_nents = 0;
spin_lock_irqsave(&rm->m_rs_lock, flags);
if (rm->m_rs) {
sock_put(rds_rs_to_sk(rm->m_rs));
rm->m_rs = NULL;
}
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
if (rm->rdma.op_active) if (rm->rdma.op_active)
rds_rdma_free_op(&rm->rdma); rds_rdma_free_op(&rm->rdma);
......
...@@ -649,7 +649,6 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status) ...@@ -649,7 +649,6 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status)
rm->rdma.op_notifier = NULL; rm->rdma.op_notifier = NULL;
} }
was_on_sock = 1; was_on_sock = 1;
rm->m_rs = NULL;
} }
spin_unlock(&rs->rs_lock); spin_unlock(&rs->rs_lock);
...@@ -756,9 +755,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) ...@@ -756,9 +755,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
*/ */
if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
spin_unlock_irqrestore(&cp->cp_lock, flags); spin_unlock_irqrestore(&cp->cp_lock, flags);
spin_lock_irqsave(&rm->m_rs_lock, flags);
rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
continue; continue;
} }
list_del_init(&rm->m_conn_item); list_del_init(&rm->m_conn_item);
...@@ -774,7 +770,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) ...@@ -774,7 +770,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
__rds_send_complete(rs, rm, RDS_RDMA_CANCELED); __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
spin_unlock(&rs->rs_lock); spin_unlock(&rs->rs_lock);
rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags); spin_unlock_irqrestore(&rm->m_rs_lock, flags);
rds_message_put(rm); rds_message_put(rm);
...@@ -798,7 +793,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) ...@@ -798,7 +793,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
__rds_send_complete(rs, rm, RDS_RDMA_CANCELED); __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
spin_unlock(&rs->rs_lock); spin_unlock(&rs->rs_lock);
rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags); spin_unlock_irqrestore(&rm->m_rs_lock, flags);
rds_message_put(rm); rds_message_put(rm);
...@@ -849,6 +843,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, ...@@ -849,6 +843,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
rds_message_addref(rm); rds_message_addref(rm);
sock_hold(rds_rs_to_sk(rs));
rm->m_rs = rs; rm->m_rs = rs;
/* The code ordering is a little weird, but we're /* The code ordering is a little weird, but we're
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment