Commit e6727f39 authored by Ursula Braun's avatar Ursula Braun Committed by David S. Miller

smc: send data (through RDMA)

copy data to kernel send buffer, and trigger RDMA write
Signed-off-by: default avatarUrsula Braun <ubraun@linux.vnet.ibm.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 5f08318f
obj-$(CONFIG_SMC) += smc.o obj-$(CONFIG_SMC) += smc.o
smc-y := af_smc.o smc_pnet.o smc_ib.o smc_clc.o smc_core.o smc_wr.o smc_llc.o smc-y := af_smc.o smc_pnet.o smc_ib.o smc_clc.o smc_core.o smc_wr.o smc_llc.o
smc-y += smc_cdc.o smc-y += smc_cdc.o smc_tx.o
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "smc_core.h" #include "smc_core.h"
#include "smc_ib.h" #include "smc_ib.h"
#include "smc_pnet.h" #include "smc_pnet.h"
#include "smc_tx.h"
static DEFINE_MUTEX(smc_create_lgr_pending); /* serialize link group static DEFINE_MUTEX(smc_create_lgr_pending); /* serialize link group
* creation * creation
...@@ -410,6 +411,8 @@ static int smc_connect_rdma(struct smc_sock *smc) ...@@ -410,6 +411,8 @@ static int smc_connect_rdma(struct smc_sock *smc)
} }
mutex_unlock(&smc_create_lgr_pending); mutex_unlock(&smc_create_lgr_pending);
smc_tx_init(smc);
out_connected: out_connected:
smc_copy_sock_settings_to_clc(smc); smc_copy_sock_settings_to_clc(smc);
smc->sk.sk_state = SMC_ACTIVE; smc->sk.sk_state = SMC_ACTIVE;
...@@ -751,6 +754,8 @@ static void smc_listen_work(struct work_struct *work) ...@@ -751,6 +754,8 @@ static void smc_listen_work(struct work_struct *work)
goto decline_rdma; goto decline_rdma;
} }
smc_tx_init(new_smc);
out_connected: out_connected:
sk_refcnt_debug_inc(newsmcsk); sk_refcnt_debug_inc(newsmcsk);
newsmcsk->sk_state = SMC_ACTIVE; newsmcsk->sk_state = SMC_ACTIVE;
...@@ -924,7 +929,7 @@ static int smc_sendmsg(struct socket *sock, struct msghdr *msg, size_t len) ...@@ -924,7 +929,7 @@ static int smc_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
if (smc->use_fallback) if (smc->use_fallback)
rc = smc->clcsock->ops->sendmsg(smc->clcsock, msg, len); rc = smc->clcsock->ops->sendmsg(smc->clcsock, msg, len);
else else
rc = sock_no_sendmsg(sock, msg, len); rc = smc_tx_sendmsg(smc, msg, len);
out: out:
release_sock(sk); release_sock(sk);
return rc; return rc;
...@@ -1005,6 +1010,12 @@ static unsigned int smc_poll(struct file *file, struct socket *sock, ...@@ -1005,6 +1010,12 @@ static unsigned int smc_poll(struct file *file, struct socket *sock,
mask |= smc_accept_poll(sk); mask |= smc_accept_poll(sk);
if (sk->sk_err) if (sk->sk_err)
mask |= POLLERR; mask |= POLLERR;
if (atomic_read(&smc->conn.sndbuf_space)) {
mask |= POLLOUT | POLLWRNORM;
} else {
sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
}
/* for now - to be enhanced in follow-on patch */ /* for now - to be enhanced in follow-on patch */
} }
......
...@@ -133,6 +133,7 @@ struct smc_connection { ...@@ -133,6 +133,7 @@ struct smc_connection {
atomic_t sndbuf_space; /* remaining space in sndbuf */ atomic_t sndbuf_space; /* remaining space in sndbuf */
u16 tx_cdc_seq; /* sequence # for CDC send */ u16 tx_cdc_seq; /* sequence # for CDC send */
spinlock_t send_lock; /* protect wr_sends */ spinlock_t send_lock; /* protect wr_sends */
struct work_struct tx_work; /* retry of smc_cdc_msg_send */
struct smc_host_cdc_msg local_rx_ctrl; /* filled during event_handl. struct smc_host_cdc_msg local_rx_ctrl; /* filled during event_handl.
* .prod cf. TCP rcv_nxt * .prod cf. TCP rcv_nxt
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "smc.h" #include "smc.h"
#include "smc_wr.h" #include "smc_wr.h"
#include "smc_cdc.h" #include "smc_cdc.h"
#include "smc_tx.h"
/********************************** send *************************************/ /********************************** send *************************************/
...@@ -52,7 +53,7 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, ...@@ -52,7 +53,7 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
smc_curs_read(&cdcpend->cursor, cdcpend->conn), smc_curs_read(&cdcpend->cursor, cdcpend->conn),
cdcpend->conn); cdcpend->conn);
} }
/* subsequent patch: wake if send buffer space available */ smc_tx_sndbuf_nonfull(smc);
bh_unlock_sock(&smc->sk); bh_unlock_sock(&smc->sk);
} }
...@@ -204,7 +205,9 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, ...@@ -204,7 +205,9 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
/* subsequent patch: terminate connection */ /* subsequent patch: terminate connection */
/* piggy backed tx info */ /* piggy backed tx info */
/* subsequent patch: wake receivers if receive buffer space available */ /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
if (diff_cons && smc_tx_prepared_sends(conn))
smc_tx_sndbuf_nonempty(conn);
/* subsequent patch: trigger socket release if connection closed */ /* subsequent patch: trigger socket release if connection closed */
......
This diff is collapsed.
/*
* Shared Memory Communications over RDMA (SMC-R) and RoCE
*
* Manage send buffer
*
* Copyright IBM Corp. 2016
*
* Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
*/
#ifndef SMC_TX_H
#define SMC_TX_H
#include <linux/socket.h>
#include <linux/types.h>
#include "smc.h"
#include "smc_cdc.h"
static inline int smc_tx_prepared_sends(struct smc_connection *conn)
{
union smc_host_cursor sent, prep;
smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn);
smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn);
return smc_curs_diff(conn->sndbuf_size, &sent, &prep);
}
void smc_tx_init(struct smc_sock *smc);
int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len);
int smc_tx_sndbuf_nonempty(struct smc_connection *conn);
void smc_tx_sndbuf_nonfull(struct smc_sock *smc);
#endif /* SMC_TX_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment