Commit 1f9ecd7e authored by Sowmini Varadhan's avatar Sowmini Varadhan Committed by David S. Miller

RDS: Pass rds_conn_path to rds_send_xmit()

Pass a struct rds_conn_path to rds_send_xmit so that MP capable
transports can transmit packets on something other than c_path[0].
The eventual goal for MP capable transports is to hash the rds
socket to a path based on the bound local address/port, and use
this path as the argument to rds_send_xmit()
Signed-off-by: default avatarSowmini Varadhan <sowmini.varadhan@oracle.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 780a6d9e
...@@ -274,7 +274,7 @@ static void rds_ib_tasklet_fn_send(unsigned long data) ...@@ -274,7 +274,7 @@ static void rds_ib_tasklet_fn_send(unsigned long data)
if (rds_conn_up(conn) && if (rds_conn_up(conn) &&
(!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued))) test_bit(0, &conn->c_map_queued)))
rds_send_xmit(ic->conn); rds_send_xmit(&ic->conn->c_path[0]);
} }
static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq, static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq,
......
...@@ -457,7 +457,9 @@ struct rds_transport { ...@@ -457,7 +457,9 @@ struct rds_transport {
int (*conn_connect)(struct rds_connection *conn); int (*conn_connect)(struct rds_connection *conn);
void (*conn_shutdown)(struct rds_connection *conn); void (*conn_shutdown)(struct rds_connection *conn);
void (*xmit_prepare)(struct rds_connection *conn); void (*xmit_prepare)(struct rds_connection *conn);
void (*xmit_path_prepare)(struct rds_conn_path *cp);
void (*xmit_complete)(struct rds_connection *conn); void (*xmit_complete)(struct rds_connection *conn);
void (*xmit_path_complete)(struct rds_conn_path *cp);
int (*xmit)(struct rds_connection *conn, struct rds_message *rm, int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
unsigned int hdr_off, unsigned int sg, unsigned int off); unsigned int hdr_off, unsigned int sg, unsigned int off);
int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
...@@ -780,7 +782,7 @@ void rds_inc_info_copy(struct rds_incoming *inc, ...@@ -780,7 +782,7 @@ void rds_inc_info_copy(struct rds_incoming *inc,
/* send.c */ /* send.c */
int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len); int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len);
void rds_send_reset(struct rds_connection *conn); void rds_send_reset(struct rds_connection *conn);
int rds_send_xmit(struct rds_connection *conn); int rds_send_xmit(struct rds_conn_path *cp);
struct sockaddr_in; struct sockaddr_in;
void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest); void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest);
typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack); typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack);
......
...@@ -107,14 +107,14 @@ void rds_send_reset(struct rds_connection *conn) ...@@ -107,14 +107,14 @@ void rds_send_reset(struct rds_connection *conn)
} }
EXPORT_SYMBOL_GPL(rds_send_reset); EXPORT_SYMBOL_GPL(rds_send_reset);
static int acquire_in_xmit(struct rds_connection *conn) static int acquire_in_xmit(struct rds_conn_path *cp)
{ {
return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
} }
static void release_in_xmit(struct rds_connection *conn) static void release_in_xmit(struct rds_conn_path *cp)
{ {
clear_bit(RDS_IN_XMIT, &conn->c_flags); clear_bit(RDS_IN_XMIT, &cp->cp_flags);
smp_mb__after_atomic(); smp_mb__after_atomic();
/* /*
* We don't use wait_on_bit()/wake_up_bit() because our waking is in a * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
...@@ -122,8 +122,8 @@ static void release_in_xmit(struct rds_connection *conn) ...@@ -122,8 +122,8 @@ static void release_in_xmit(struct rds_connection *conn)
* the system-wide hashed waitqueue buckets in the fast path only to * the system-wide hashed waitqueue buckets in the fast path only to
* almost never find waiters. * almost never find waiters.
*/ */
if (waitqueue_active(&conn->c_waitq)) if (waitqueue_active(&cp->cp_waitq))
wake_up_all(&conn->c_waitq); wake_up_all(&cp->cp_waitq);
} }
/* /*
...@@ -140,8 +140,9 @@ static void release_in_xmit(struct rds_connection *conn) ...@@ -140,8 +140,9 @@ static void release_in_xmit(struct rds_connection *conn)
* - small message latency is higher behind queued large messages * - small message latency is higher behind queued large messages
* - large message latency isn't starved by intervening small sends * - large message latency isn't starved by intervening small sends
*/ */
int rds_send_xmit(struct rds_connection *conn) int rds_send_xmit(struct rds_conn_path *cp)
{ {
struct rds_connection *conn = cp->cp_conn;
struct rds_message *rm; struct rds_message *rm;
unsigned long flags; unsigned long flags;
unsigned int tmp; unsigned int tmp;
...@@ -161,7 +162,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -161,7 +162,7 @@ int rds_send_xmit(struct rds_connection *conn)
* avoids blocking the caller and trading per-connection data between * avoids blocking the caller and trading per-connection data between
* caches per message. * caches per message.
*/ */
if (!acquire_in_xmit(conn)) { if (!acquire_in_xmit(cp)) {
rds_stats_inc(s_send_lock_contention); rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM; ret = -ENOMEM;
goto out; goto out;
...@@ -175,21 +176,25 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -175,21 +176,25 @@ int rds_send_xmit(struct rds_connection *conn)
* The acquire_in_xmit() check above ensures that only one * The acquire_in_xmit() check above ensures that only one
* caller can increment c_send_gen at any time. * caller can increment c_send_gen at any time.
*/ */
conn->c_send_gen++; cp->cp_send_gen++;
send_gen = conn->c_send_gen; send_gen = cp->cp_send_gen;
/* /*
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
* we do the opposite to avoid races. * we do the opposite to avoid races.
*/ */
if (!rds_conn_up(conn)) { if (!rds_conn_path_up(cp)) {
release_in_xmit(conn); release_in_xmit(cp);
ret = 0; ret = 0;
goto out; goto out;
} }
if (conn->c_trans->xmit_prepare) if (conn->c_trans->t_mp_capable) {
if (conn->c_trans->xmit_path_prepare)
conn->c_trans->xmit_path_prepare(cp);
} else if (conn->c_trans->xmit_prepare) {
conn->c_trans->xmit_prepare(conn); conn->c_trans->xmit_prepare(conn);
}
/* /*
* spin trying to push headers and data down the connection until * spin trying to push headers and data down the connection until
...@@ -197,7 +202,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -197,7 +202,7 @@ int rds_send_xmit(struct rds_connection *conn)
*/ */
while (1) { while (1) {
rm = conn->c_xmit_rm; rm = cp->cp_xmit_rm;
/* /*
* If between sending messages, we can send a pending congestion * If between sending messages, we can send a pending congestion
...@@ -210,14 +215,16 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -210,14 +215,16 @@ int rds_send_xmit(struct rds_connection *conn)
break; break;
} }
rm->data.op_active = 1; rm->data.op_active = 1;
rm->m_inc.i_conn_path = cp;
rm->m_inc.i_conn = cp->cp_conn;
conn->c_xmit_rm = rm; cp->cp_xmit_rm = rm;
} }
/* /*
* If not already working on one, grab the next message. * If not already working on one, grab the next message.
* *
* c_xmit_rm holds a ref while we're sending this message down * cp_xmit_rm holds a ref while we're sending this message down
* the connction. We can use this ref while holding the * the connction. We can use this ref while holding the
* send_sem.. rds_send_reset() is serialized with it. * send_sem.. rds_send_reset() is serialized with it.
*/ */
...@@ -234,10 +241,10 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -234,10 +241,10 @@ int rds_send_xmit(struct rds_connection *conn)
if (batch_count >= send_batch_count) if (batch_count >= send_batch_count)
goto over_batch; goto over_batch;
spin_lock_irqsave(&conn->c_lock, flags); spin_lock_irqsave(&cp->cp_lock, flags);
if (!list_empty(&conn->c_send_queue)) { if (!list_empty(&cp->cp_send_queue)) {
rm = list_entry(conn->c_send_queue.next, rm = list_entry(cp->cp_send_queue.next,
struct rds_message, struct rds_message,
m_conn_item); m_conn_item);
rds_message_addref(rm); rds_message_addref(rm);
...@@ -246,10 +253,11 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -246,10 +253,11 @@ int rds_send_xmit(struct rds_connection *conn)
* Move the message from the send queue to the retransmit * Move the message from the send queue to the retransmit
* list right away. * list right away.
*/ */
list_move_tail(&rm->m_conn_item, &conn->c_retrans); list_move_tail(&rm->m_conn_item,
&cp->cp_retrans);
} }
spin_unlock_irqrestore(&conn->c_lock, flags); spin_unlock_irqrestore(&cp->cp_lock, flags);
if (!rm) if (!rm)
break; break;
...@@ -263,32 +271,34 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -263,32 +271,34 @@ int rds_send_xmit(struct rds_connection *conn)
*/ */
if (rm->rdma.op_active && if (rm->rdma.op_active &&
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
spin_lock_irqsave(&conn->c_lock, flags); spin_lock_irqsave(&cp->cp_lock, flags);
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move(&rm->m_conn_item, &to_be_dropped); list_move(&rm->m_conn_item, &to_be_dropped);
spin_unlock_irqrestore(&conn->c_lock, flags); spin_unlock_irqrestore(&cp->cp_lock, flags);
continue; continue;
} }
/* Require an ACK every once in a while */ /* Require an ACK every once in a while */
len = ntohl(rm->m_inc.i_hdr.h_len); len = ntohl(rm->m_inc.i_hdr.h_len);
if (conn->c_unacked_packets == 0 || if (cp->cp_unacked_packets == 0 ||
conn->c_unacked_bytes < len) { cp->cp_unacked_bytes < len) {
__set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
conn->c_unacked_packets = rds_sysctl_max_unacked_packets; cp->cp_unacked_packets =
conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; rds_sysctl_max_unacked_packets;
cp->cp_unacked_bytes =
rds_sysctl_max_unacked_bytes;
rds_stats_inc(s_send_ack_required); rds_stats_inc(s_send_ack_required);
} else { } else {
conn->c_unacked_bytes -= len; cp->cp_unacked_bytes -= len;
conn->c_unacked_packets--; cp->cp_unacked_packets--;
} }
conn->c_xmit_rm = rm; cp->cp_xmit_rm = rm;
} }
/* The transport either sends the whole rdma or none of it */ /* The transport either sends the whole rdma or none of it */
if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
rm->m_final_op = &rm->rdma; rm->m_final_op = &rm->rdma;
/* The transport owns the mapped memory for now. /* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue * You can't unmap it while it's on the send queue
...@@ -300,11 +310,11 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -300,11 +310,11 @@ int rds_send_xmit(struct rds_connection *conn)
wake_up_interruptible(&rm->m_flush_wait); wake_up_interruptible(&rm->m_flush_wait);
break; break;
} }
conn->c_xmit_rdma_sent = 1; cp->cp_xmit_rdma_sent = 1;
} }
if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
rm->m_final_op = &rm->atomic; rm->m_final_op = &rm->atomic;
/* The transport owns the mapped memory for now. /* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue * You can't unmap it while it's on the send queue
...@@ -316,7 +326,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -316,7 +326,7 @@ int rds_send_xmit(struct rds_connection *conn)
wake_up_interruptible(&rm->m_flush_wait); wake_up_interruptible(&rm->m_flush_wait);
break; break;
} }
conn->c_xmit_atomic_sent = 1; cp->cp_xmit_atomic_sent = 1;
} }
...@@ -342,41 +352,42 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -342,41 +352,42 @@ int rds_send_xmit(struct rds_connection *conn)
rm->data.op_active = 0; rm->data.op_active = 0;
} }
if (rm->data.op_active && !conn->c_xmit_data_sent) { if (rm->data.op_active && !cp->cp_xmit_data_sent) {
rm->m_final_op = &rm->data; rm->m_final_op = &rm->data;
ret = conn->c_trans->xmit(conn, rm, ret = conn->c_trans->xmit(conn, rm,
conn->c_xmit_hdr_off, cp->cp_xmit_hdr_off,
conn->c_xmit_sg, cp->cp_xmit_sg,
conn->c_xmit_data_off); cp->cp_xmit_data_off);
if (ret <= 0) if (ret <= 0)
break; break;
if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
tmp = min_t(int, ret, tmp = min_t(int, ret,
sizeof(struct rds_header) - sizeof(struct rds_header) -
conn->c_xmit_hdr_off); cp->cp_xmit_hdr_off);
conn->c_xmit_hdr_off += tmp; cp->cp_xmit_hdr_off += tmp;
ret -= tmp; ret -= tmp;
} }
sg = &rm->data.op_sg[conn->c_xmit_sg]; sg = &rm->data.op_sg[cp->cp_xmit_sg];
while (ret) { while (ret) {
tmp = min_t(int, ret, sg->length - tmp = min_t(int, ret, sg->length -
conn->c_xmit_data_off); cp->cp_xmit_data_off);
conn->c_xmit_data_off += tmp; cp->cp_xmit_data_off += tmp;
ret -= tmp; ret -= tmp;
if (conn->c_xmit_data_off == sg->length) { if (cp->cp_xmit_data_off == sg->length) {
conn->c_xmit_data_off = 0; cp->cp_xmit_data_off = 0;
sg++; sg++;
conn->c_xmit_sg++; cp->cp_xmit_sg++;
BUG_ON(ret != 0 && BUG_ON(ret != 0 && cp->cp_xmit_sg ==
conn->c_xmit_sg == rm->data.op_nents); rm->data.op_nents);
} }
} }
if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
(conn->c_xmit_sg == rm->data.op_nents)) (cp->cp_xmit_sg == rm->data.op_nents))
conn->c_xmit_data_sent = 1; cp->cp_xmit_data_sent = 1;
} }
/* /*
...@@ -384,23 +395,27 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -384,23 +395,27 @@ int rds_send_xmit(struct rds_connection *conn)
* if there is a data op. Thus, if the data is sent (or there was * if there is a data op. Thus, if the data is sent (or there was
* none), then we're done with the rm. * none), then we're done with the rm.
*/ */
if (!rm->data.op_active || conn->c_xmit_data_sent) { if (!rm->data.op_active || cp->cp_xmit_data_sent) {
conn->c_xmit_rm = NULL; cp->cp_xmit_rm = NULL;
conn->c_xmit_sg = 0; cp->cp_xmit_sg = 0;
conn->c_xmit_hdr_off = 0; cp->cp_xmit_hdr_off = 0;
conn->c_xmit_data_off = 0; cp->cp_xmit_data_off = 0;
conn->c_xmit_rdma_sent = 0; cp->cp_xmit_rdma_sent = 0;
conn->c_xmit_atomic_sent = 0; cp->cp_xmit_atomic_sent = 0;
conn->c_xmit_data_sent = 0; cp->cp_xmit_data_sent = 0;
rds_message_put(rm); rds_message_put(rm);
} }
} }
over_batch: over_batch:
if (conn->c_trans->xmit_complete) if (conn->c_trans->t_mp_capable) {
if (conn->c_trans->xmit_path_complete)
conn->c_trans->xmit_path_complete(cp);
} else if (conn->c_trans->xmit_complete) {
conn->c_trans->xmit_complete(conn); conn->c_trans->xmit_complete(conn);
release_in_xmit(conn); }
release_in_xmit(cp);
/* Nuke any messages we decided not to retransmit. */ /* Nuke any messages we decided not to retransmit. */
if (!list_empty(&to_be_dropped)) { if (!list_empty(&to_be_dropped)) {
...@@ -428,12 +443,12 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -428,12 +443,12 @@ int rds_send_xmit(struct rds_connection *conn)
if (ret == 0) { if (ret == 0) {
smp_mb(); smp_mb();
if ((test_bit(0, &conn->c_map_queued) || if ((test_bit(0, &conn->c_map_queued) ||
!list_empty(&conn->c_send_queue)) && !list_empty(&cp->cp_send_queue)) &&
send_gen == conn->c_send_gen) { send_gen == cp->cp_send_gen) {
rds_stats_inc(s_send_lock_queue_raced); rds_stats_inc(s_send_lock_queue_raced);
if (batch_count < send_batch_count) if (batch_count < send_batch_count)
goto restart; goto restart;
queue_delayed_work(rds_wq, &conn->c_send_w, 1); queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
} }
} }
out: out:
...@@ -1110,9 +1125,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) ...@@ -1110,9 +1125,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
*/ */
rds_stats_inc(s_send_queued); rds_stats_inc(s_send_queued);
ret = rds_send_xmit(conn); ret = rds_send_xmit(cpath);
if (ret == -ENOMEM || ret == -EAGAIN) if (ret == -ENOMEM || ret == -EAGAIN)
queue_delayed_work(rds_wq, &conn->c_send_w, 1); queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
rds_message_put(rm); rds_message_put(rm);
return payload_len; return payload_len;
......
...@@ -177,7 +177,7 @@ void rds_send_worker(struct work_struct *work) ...@@ -177,7 +177,7 @@ void rds_send_worker(struct work_struct *work)
if (rds_conn_path_state(cp) == RDS_CONN_UP) { if (rds_conn_path_state(cp) == RDS_CONN_UP) {
clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags); clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
ret = rds_send_xmit(cp->cp_conn); ret = rds_send_xmit(cp);
cond_resched(); cond_resched();
rdsdebug("conn %p ret %d\n", cp->cp_conn, ret); rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
switch (ret) { switch (ret) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment