Commit de8474eb authored by Stefan Raspl's avatar Stefan Raspl Committed by David S. Miller

net/smc: urgent data support

Add support for out of band data send and receive.
Signed-off-by: default avatarStefan Raspl <raspl@linux.ibm.com>
Signed-off-by: default avatarUrsula Braun <ubraun@linux.ibm.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent b9f227c3
...@@ -8,8 +8,6 @@ ...@@ -8,8 +8,6 @@
* *
* Initial restrictions: * Initial restrictions:
* - support for alternate links postponed * - support for alternate links postponed
* - partial support for non-blocking sockets only
* - support for urgent data postponed
* *
* Copyright IBM Corp. 2016, 2018 * Copyright IBM Corp. 2016, 2018
* *
...@@ -1338,6 +1336,8 @@ static __poll_t smc_poll(struct file *file, struct socket *sock, ...@@ -1338,6 +1336,8 @@ static __poll_t smc_poll(struct file *file, struct socket *sock,
if (sk->sk_state == SMC_APPCLOSEWAIT1) if (sk->sk_state == SMC_APPCLOSEWAIT1)
mask |= EPOLLIN; mask |= EPOLLIN;
} }
if (smc->conn.urg_state == SMC_URG_VALID)
mask |= EPOLLPRI;
} }
release_sock(sk); release_sock(sk);
...@@ -1477,10 +1477,13 @@ static int smc_getsockopt(struct socket *sock, int level, int optname, ...@@ -1477,10 +1477,13 @@ static int smc_getsockopt(struct socket *sock, int level, int optname,
static int smc_ioctl(struct socket *sock, unsigned int cmd, static int smc_ioctl(struct socket *sock, unsigned int cmd,
unsigned long arg) unsigned long arg)
{ {
union smc_host_cursor cons, urg;
struct smc_connection *conn;
struct smc_sock *smc; struct smc_sock *smc;
int answ; int answ;
smc = smc_sk(sock->sk); smc = smc_sk(sock->sk);
conn = &smc->conn;
if (smc->use_fallback) { if (smc->use_fallback) {
if (!smc->clcsock) if (!smc->clcsock)
return -EBADF; return -EBADF;
...@@ -1517,6 +1520,23 @@ static int smc_ioctl(struct socket *sock, unsigned int cmd, ...@@ -1517,6 +1520,23 @@ static int smc_ioctl(struct socket *sock, unsigned int cmd,
else else
answ = smc_tx_prepared_sends(&smc->conn); answ = smc_tx_prepared_sends(&smc->conn);
break; break;
case SIOCATMARK:
if (smc->sk.sk_state == SMC_LISTEN)
return -EINVAL;
if (smc->sk.sk_state == SMC_INIT ||
smc->sk.sk_state == SMC_CLOSED) {
answ = 0;
} else {
smc_curs_write(&cons,
smc_curs_read(&conn->local_tx_ctrl.cons, conn),
conn);
smc_curs_write(&urg,
smc_curs_read(&conn->urg_curs, conn),
conn);
answ = smc_curs_diff(conn->rmb_desc->len,
&cons, &urg) == 1;
}
break;
default: default:
return -ENOIOCTLCMD; return -ENOIOCTLCMD;
} }
......
...@@ -114,6 +114,12 @@ struct smc_host_cdc_msg { /* Connection Data Control message */ ...@@ -114,6 +114,12 @@ struct smc_host_cdc_msg { /* Connection Data Control message */
u8 reserved[18]; u8 reserved[18];
} __aligned(8); } __aligned(8);
enum smc_urg_state {
SMC_URG_VALID, /* data present */
SMC_URG_NOTYET, /* data pending */
SMC_URG_READ /* data was already read */
};
struct smc_connection { struct smc_connection {
struct rb_node alert_node; struct rb_node alert_node;
struct smc_link_group *lgr; /* link group of connection */ struct smc_link_group *lgr; /* link group of connection */
...@@ -160,6 +166,15 @@ struct smc_connection { ...@@ -160,6 +166,15 @@ struct smc_connection {
union smc_host_cursor rx_curs_confirmed; /* confirmed to peer union smc_host_cursor rx_curs_confirmed; /* confirmed to peer
* source of snd_una ? * source of snd_una ?
*/ */
union smc_host_cursor urg_curs; /* points at urgent byte */
enum smc_urg_state urg_state;
bool urg_tx_pend; /* urgent data staged */
bool urg_rx_skip_pend;
/* indicate urgent oob data
* read, but previous regular
* data still pending
*/
char urg_rx_byte; /* urgent byte */
atomic_t bytes_to_rcv; /* arrived data, atomic_t bytes_to_rcv; /* arrived data,
* not yet received * not yet received
*/ */
......
...@@ -164,6 +164,28 @@ static inline bool smc_cdc_before(u16 seq1, u16 seq2) ...@@ -164,6 +164,28 @@ static inline bool smc_cdc_before(u16 seq1, u16 seq2)
return (s16)(seq1 - seq2) < 0; return (s16)(seq1 - seq2) < 0;
} }
static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc,
int *diff_prod)
{
struct smc_connection *conn = &smc->conn;
char *base;
/* new data included urgent business */
smc_curs_write(&conn->urg_curs,
smc_curs_read(&conn->local_rx_ctrl.prod, conn),
conn);
conn->urg_state = SMC_URG_VALID;
if (!sock_flag(&smc->sk, SOCK_URGINLINE))
/* we'll skip the urgent byte, so don't account for it */
(*diff_prod)--;
base = (char *)conn->rmb_desc->cpu_addr;
if (conn->urg_curs.count)
conn->urg_rx_byte = *(base + conn->urg_curs.count - 1);
else
conn->urg_rx_byte = *(base + conn->rmb_desc->len - 1);
sk_send_sigurg(&smc->sk);
}
static void smc_cdc_msg_recv_action(struct smc_sock *smc, static void smc_cdc_msg_recv_action(struct smc_sock *smc,
struct smc_cdc_msg *cdc) struct smc_cdc_msg *cdc)
{ {
...@@ -194,15 +216,25 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, ...@@ -194,15 +216,25 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old, diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old,
&conn->local_rx_ctrl.prod); &conn->local_rx_ctrl.prod);
if (diff_prod) { if (diff_prod) {
if (conn->local_rx_ctrl.prod_flags.urg_data_present)
smc_cdc_handle_urg_data_arrival(smc, &diff_prod);
/* bytes_to_rcv is decreased in smc_recvmsg */ /* bytes_to_rcv is decreased in smc_recvmsg */
smp_mb__before_atomic(); smp_mb__before_atomic();
atomic_add(diff_prod, &conn->bytes_to_rcv); atomic_add(diff_prod, &conn->bytes_to_rcv);
/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
smp_mb__after_atomic(); smp_mb__after_atomic();
smc->sk.sk_data_ready(&smc->sk); smc->sk.sk_data_ready(&smc->sk);
} else if ((conn->local_rx_ctrl.prod_flags.write_blocked) || } else {
(conn->local_rx_ctrl.prod_flags.cons_curs_upd_req)) { if (conn->local_rx_ctrl.prod_flags.write_blocked ||
smc->sk.sk_data_ready(&smc->sk); conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
conn->local_rx_ctrl.prod_flags.urg_data_pending) {
if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
conn->urg_state = SMC_URG_NOTYET;
/* force immediate tx of current consumer cursor, but
* under send_lock to guarantee arrival in seqno-order
*/
smc_tx_sndbuf_nonempty(conn);
}
} }
/* piggy backed tx info */ /* piggy backed tx info */
...@@ -212,6 +244,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, ...@@ -212,6 +244,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
/* trigger socket release if connection closed */ /* trigger socket release if connection closed */
smc_close_wake_tx_prepared(smc); smc_close_wake_tx_prepared(smc);
} }
if (diff_cons && conn->urg_tx_pend &&
atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
/* urg data confirmed by peer, indicate we're ready for more */
conn->urg_tx_pend = false;
smc->sk.sk_write_space(&smc->sk);
}
if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) { if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
smc->sk.sk_err = ECONNRESET; smc->sk.sk_err = ECONNRESET;
......
...@@ -146,6 +146,19 @@ static inline int smc_curs_diff(unsigned int size, ...@@ -146,6 +146,19 @@ static inline int smc_curs_diff(unsigned int size,
return max_t(int, 0, (new->count - old->count)); return max_t(int, 0, (new->count - old->count));
} }
/* calculate cursor difference between old and new - returns negative
* value in case old > new
*/
static inline int smc_curs_comp(unsigned int size,
union smc_host_cursor *old,
union smc_host_cursor *new)
{
if (old->wrap > new->wrap ||
(old->wrap == new->wrap && old->count > new->count))
return -smc_curs_diff(size, new, old);
return smc_curs_diff(size, old, new);
}
static inline void smc_host_cursor_to_cdc(union smc_cdc_cursor *peer, static inline void smc_host_cursor_to_cdc(union smc_cdc_cursor *peer,
union smc_host_cursor *local, union smc_host_cursor *local,
struct smc_connection *conn) struct smc_connection *conn)
......
...@@ -544,6 +544,7 @@ int smc_conn_create(struct smc_sock *smc, ...@@ -544,6 +544,7 @@ int smc_conn_create(struct smc_sock *smc,
} }
conn->local_tx_ctrl.common.type = SMC_CDC_MSG_TYPE; conn->local_tx_ctrl.common.type = SMC_CDC_MSG_TYPE;
conn->local_tx_ctrl.len = SMC_WR_TX_SIZE; conn->local_tx_ctrl.len = SMC_WR_TX_SIZE;
conn->urg_state = SMC_URG_READ;
#ifndef KERNEL_HAS_ATOMIC64 #ifndef KERNEL_HAS_ATOMIC64
spin_lock_init(&conn->acurs_lock); spin_lock_init(&conn->acurs_lock);
#endif #endif
......
...@@ -47,16 +47,59 @@ static void smc_rx_wake_up(struct sock *sk) ...@@ -47,16 +47,59 @@ static void smc_rx_wake_up(struct sock *sk)
* @conn connection to update * @conn connection to update
* @cons consumer cursor * @cons consumer cursor
* @len number of Bytes consumed * @len number of Bytes consumed
* Returns:
* 1 if we should end our receive, 0 otherwise
*/ */
static void smc_rx_update_consumer(struct smc_connection *conn, static int smc_rx_update_consumer(struct smc_sock *smc,
union smc_host_cursor cons, size_t len) union smc_host_cursor cons, size_t len)
{ {
struct smc_connection *conn = &smc->conn;
struct sock *sk = &smc->sk;
bool force = false;
int diff, rc = 0;
smc_curs_add(conn->rmb_desc->len, &cons, len); smc_curs_add(conn->rmb_desc->len, &cons, len);
/* did we process urgent data? */
if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) {
diff = smc_curs_comp(conn->rmb_desc->len, &cons,
&conn->urg_curs);
if (sock_flag(sk, SOCK_URGINLINE)) {
if (diff == 0) {
force = true;
rc = 1;
conn->urg_state = SMC_URG_READ;
}
} else {
if (diff == 1) {
/* skip urgent byte */
force = true;
smc_curs_add(conn->rmb_desc->len, &cons, 1);
conn->urg_rx_skip_pend = false;
} else if (diff < -1)
/* we read past urgent byte */
conn->urg_state = SMC_URG_READ;
}
}
smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn), smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn),
conn); conn);
/* send consumer cursor update if required */ /* send consumer cursor update if required */
/* similar to advertising new TCP rcv_wnd if required */ /* similar to advertising new TCP rcv_wnd if required */
smc_tx_consumer_update(conn); smc_tx_consumer_update(conn, force);
return rc;
}
static void smc_rx_update_cons(struct smc_sock *smc, size_t len)
{
struct smc_connection *conn = &smc->conn;
union smc_host_cursor cons;
smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn),
conn);
smc_rx_update_consumer(smc, cons, len);
} }
struct smc_spd_priv { struct smc_spd_priv {
...@@ -70,7 +113,6 @@ static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe, ...@@ -70,7 +113,6 @@ static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private; struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
struct smc_sock *smc = priv->smc; struct smc_sock *smc = priv->smc;
struct smc_connection *conn; struct smc_connection *conn;
union smc_host_cursor cons;
struct sock *sk = &smc->sk; struct sock *sk = &smc->sk;
if (sk->sk_state == SMC_CLOSED || if (sk->sk_state == SMC_CLOSED ||
...@@ -79,9 +121,7 @@ static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe, ...@@ -79,9 +121,7 @@ static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
goto out; goto out;
conn = &smc->conn; conn = &smc->conn;
lock_sock(sk); lock_sock(sk);
smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn), smc_rx_update_cons(smc, priv->len);
conn);
smc_rx_update_consumer(conn, cons, priv->len);
release_sock(sk); release_sock(sk);
if (atomic_sub_and_test(priv->len, &conn->splice_pending)) if (atomic_sub_and_test(priv->len, &conn->splice_pending))
smc_rx_wake_up(sk); smc_rx_wake_up(sk);
...@@ -184,6 +224,52 @@ int smc_rx_wait(struct smc_sock *smc, long *timeo, ...@@ -184,6 +224,52 @@ int smc_rx_wait(struct smc_sock *smc, long *timeo,
return rc; return rc;
} }
static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len,
int flags)
{
struct smc_connection *conn = &smc->conn;
union smc_host_cursor cons;
struct sock *sk = &smc->sk;
int rc = 0;
if (sock_flag(sk, SOCK_URGINLINE) ||
!(conn->urg_state == SMC_URG_VALID) ||
conn->urg_state == SMC_URG_READ)
return -EINVAL;
if (conn->urg_state == SMC_URG_VALID) {
if (!(flags & MSG_PEEK))
smc->conn.urg_state = SMC_URG_READ;
msg->msg_flags |= MSG_OOB;
if (len > 0) {
if (!(flags & MSG_TRUNC))
rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1);
len = 1;
smc_curs_write(&cons,
smc_curs_read(&conn->local_tx_ctrl.cons,
conn),
conn);
if (smc_curs_diff(conn->rmb_desc->len, &cons,
&conn->urg_curs) > 1)
conn->urg_rx_skip_pend = true;
/* Urgent Byte was already accounted for, but trigger
* skipping the urgent byte in non-inline case
*/
if (!(flags & MSG_PEEK))
smc_rx_update_consumer(smc, cons, 0);
} else {
msg->msg_flags |= MSG_TRUNC;
}
return rc ? -EFAULT : len;
}
if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN)
return 0;
return -EAGAIN;
}
/* smc_rx_recvmsg - receive data from RMBE /* smc_rx_recvmsg - receive data from RMBE
* @msg: copy data to receive buffer * @msg: copy data to receive buffer
* @pipe: copy data to pipe if set - indicates splice() call * @pipe: copy data to pipe if set - indicates splice() call
...@@ -209,12 +295,12 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, ...@@ -209,12 +295,12 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
if (unlikely(flags & MSG_ERRQUEUE)) if (unlikely(flags & MSG_ERRQUEUE))
return -EINVAL; /* future work for sk.sk_family == AF_SMC */ return -EINVAL; /* future work for sk.sk_family == AF_SMC */
if (flags & MSG_OOB)
return -EINVAL; /* future work */
sk = &smc->sk; sk = &smc->sk;
if (sk->sk_state == SMC_LISTEN) if (sk->sk_state == SMC_LISTEN)
return -ENOTCONN; return -ENOTCONN;
if (flags & MSG_OOB)
return smc_rx_recv_urg(smc, msg, len, flags);
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
...@@ -227,6 +313,9 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, ...@@ -227,6 +313,9 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
if (atomic_read(&conn->bytes_to_rcv)) if (atomic_read(&conn->bytes_to_rcv))
goto copy; goto copy;
else if (conn->urg_state == SMC_URG_VALID)
/* we received a single urgent Byte - skip */
smc_rx_update_cons(smc, 0);
if (sk->sk_shutdown & RCV_SHUTDOWN || if (sk->sk_shutdown & RCV_SHUTDOWN ||
smc_cdc_rxed_any_close_or_senddone(conn) || smc_cdc_rxed_any_close_or_senddone(conn) ||
...@@ -281,14 +370,18 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, ...@@ -281,14 +370,18 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
continue; continue;
} }
/* not more than what user space asked for */
copylen = min_t(size_t, read_remaining, readable);
smc_curs_write(&cons, smc_curs_write(&cons,
smc_curs_read(&conn->local_tx_ctrl.cons, conn), smc_curs_read(&conn->local_tx_ctrl.cons, conn),
conn); conn);
/* subsequent splice() calls pick up where previous left */ /* subsequent splice() calls pick up where previous left */
if (splbytes) if (splbytes)
smc_curs_add(conn->rmb_desc->len, &cons, splbytes); smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
if (conn->urg_state == SMC_URG_VALID &&
sock_flag(&smc->sk, SOCK_URGINLINE) &&
readable > 1)
readable--; /* always stop at urgent Byte */
/* not more than what user space asked for */
copylen = min_t(size_t, read_remaining, readable);
/* determine chunks where to read from rcvbuf */ /* determine chunks where to read from rcvbuf */
/* either unwrapped case, or 1st chunk of wrapped case */ /* either unwrapped case, or 1st chunk of wrapped case */
chunk_len = min_t(size_t, copylen, conn->rmb_desc->len - chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
...@@ -333,8 +426,8 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, ...@@ -333,8 +426,8 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
atomic_sub(copylen, &conn->bytes_to_rcv); atomic_sub(copylen, &conn->bytes_to_rcv);
/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
smp_mb__after_atomic(); smp_mb__after_atomic();
if (msg) if (msg && smc_rx_update_consumer(smc, cons, copylen))
smc_rx_update_consumer(conn, cons, copylen); goto out;
} }
} while (read_remaining); } while (read_remaining);
out: out:
...@@ -346,4 +439,5 @@ void smc_rx_init(struct smc_sock *smc) ...@@ -346,4 +439,5 @@ void smc_rx_init(struct smc_sock *smc)
{ {
smc->sk.sk_data_ready = smc_rx_wake_up; smc->sk.sk_data_ready = smc_rx_wake_up;
atomic_set(&smc->conn.splice_pending, 0); atomic_set(&smc->conn.splice_pending, 0);
smc->conn.urg_state = SMC_URG_READ;
} }
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
/***************************** sndbuf producer *******************************/ /***************************** sndbuf producer *******************************/
/* callback implementation for sk.sk_write_space() /* callback implementation for sk.sk_write_space()
* to wakeup sndbuf producers that blocked with smc_tx_wait_memory(). * to wakeup sndbuf producers that blocked with smc_tx_wait().
* called under sk_socket lock. * called under sk_socket lock.
*/ */
static void smc_tx_write_space(struct sock *sk) static void smc_tx_write_space(struct sock *sk)
...@@ -56,7 +56,7 @@ static void smc_tx_write_space(struct sock *sk) ...@@ -56,7 +56,7 @@ static void smc_tx_write_space(struct sock *sk)
} }
} }
/* Wakeup sndbuf producers that blocked with smc_tx_wait_memory(). /* Wakeup sndbuf producers that blocked with smc_tx_wait().
* Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space(). * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
*/ */
void smc_tx_sndbuf_nonfull(struct smc_sock *smc) void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
...@@ -66,8 +66,10 @@ void smc_tx_sndbuf_nonfull(struct smc_sock *smc) ...@@ -66,8 +66,10 @@ void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
smc->sk.sk_write_space(&smc->sk); smc->sk.sk_write_space(&smc->sk);
} }
/* blocks sndbuf producer until at least one byte of free space available */ /* blocks sndbuf producer until at least one byte of free space available
static int smc_tx_wait_memory(struct smc_sock *smc, int flags) * or urgent Byte was consumed
*/
static int smc_tx_wait(struct smc_sock *smc, int flags)
{ {
DEFINE_WAIT_FUNC(wait, woken_wake_function); DEFINE_WAIT_FUNC(wait, woken_wake_function);
struct smc_connection *conn = &smc->conn; struct smc_connection *conn = &smc->conn;
...@@ -103,14 +105,15 @@ static int smc_tx_wait_memory(struct smc_sock *smc, int flags) ...@@ -103,14 +105,15 @@ static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
break; break;
} }
sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk); sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
if (atomic_read(&conn->sndbuf_space)) if (atomic_read(&conn->sndbuf_space) && !conn->urg_tx_pend)
break; /* at least 1 byte of free space available */ break; /* at least 1 byte of free & no urgent data */
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
sk_wait_event(sk, &timeo, sk_wait_event(sk, &timeo,
sk->sk_err || sk->sk_err ||
(sk->sk_shutdown & SEND_SHUTDOWN) || (sk->sk_shutdown & SEND_SHUTDOWN) ||
smc_cdc_rxed_any_close(conn) || smc_cdc_rxed_any_close(conn) ||
atomic_read(&conn->sndbuf_space), (atomic_read(&conn->sndbuf_space) &&
!conn->urg_tx_pend),
&wait); &wait);
} }
remove_wait_queue(sk_sleep(sk), &wait); remove_wait_queue(sk_sleep(sk), &wait);
...@@ -157,8 +160,11 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) ...@@ -157,8 +160,11 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
if (smc_cdc_rxed_any_close(conn)) if (smc_cdc_rxed_any_close(conn))
return send_done ?: -ECONNRESET; return send_done ?: -ECONNRESET;
if (!atomic_read(&conn->sndbuf_space)) { if (msg->msg_flags & MSG_OOB)
rc = smc_tx_wait_memory(smc, msg->msg_flags); conn->local_tx_ctrl.prod_flags.urg_data_pending = 1;
if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) {
rc = smc_tx_wait(smc, msg->msg_flags);
if (rc) { if (rc) {
if (send_done) if (send_done)
return send_done; return send_done;
...@@ -168,7 +174,7 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) ...@@ -168,7 +174,7 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
} }
/* initialize variables for 1st iteration of subsequent loop */ /* initialize variables for 1st iteration of subsequent loop */
/* could be just 1 byte, even after smc_tx_wait_memory above */ /* could be just 1 byte, even after smc_tx_wait above */
writespace = atomic_read(&conn->sndbuf_space); writespace = atomic_read(&conn->sndbuf_space);
/* not more than what user space asked for */ /* not more than what user space asked for */
copylen = min_t(size_t, send_remaining, writespace); copylen = min_t(size_t, send_remaining, writespace);
...@@ -218,6 +224,8 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) ...@@ -218,6 +224,8 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
/* since we just produced more new data into sndbuf, /* since we just produced more new data into sndbuf,
* trigger sndbuf consumer: RDMA write into peer RMBE and CDC * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
*/ */
if ((msg->msg_flags & MSG_OOB) && !send_remaining)
conn->urg_tx_pend = true;
if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) && if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) &&
(atomic_read(&conn->sndbuf_space) > (atomic_read(&conn->sndbuf_space) >
(conn->sndbuf_desc->len >> 1))) (conn->sndbuf_desc->len >> 1)))
...@@ -299,6 +307,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn) ...@@ -299,6 +307,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
union smc_host_cursor sent, prep, prod, cons; union smc_host_cursor sent, prep, prod, cons;
struct ib_sge sges[SMC_IB_MAX_SEND_SGE]; struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
struct smc_link_group *lgr = conn->lgr; struct smc_link_group *lgr = conn->lgr;
struct smc_cdc_producer_flags *pflags;
int to_send, rmbespace; int to_send, rmbespace;
struct smc_link *link; struct smc_link *link;
dma_addr_t dma_addr; dma_addr_t dma_addr;
...@@ -326,7 +335,8 @@ static int smc_tx_rdma_writes(struct smc_connection *conn) ...@@ -326,7 +335,8 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
conn); conn);
/* if usable snd_wnd closes ask peer to advertise once it opens again */ /* if usable snd_wnd closes ask peer to advertise once it opens again */
conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace); pflags = &conn->local_tx_ctrl.prod_flags;
pflags->write_blocked = (to_send >= rmbespace);
/* cf. usable snd_wnd */ /* cf. usable snd_wnd */
len = min(to_send, rmbespace); len = min(to_send, rmbespace);
...@@ -391,6 +401,8 @@ static int smc_tx_rdma_writes(struct smc_connection *conn) ...@@ -391,6 +401,8 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
src_len_sum = src_len; src_len_sum = src_len;
} }
if (conn->urg_tx_pend && len == to_send)
pflags->urg_data_present = 1;
smc_tx_advance_cursors(conn, &prod, &sent, len); smc_tx_advance_cursors(conn, &prod, &sent, len);
/* update connection's cursors with advanced local cursors */ /* update connection's cursors with advanced local cursors */
smc_curs_write(&conn->local_tx_ctrl.prod, smc_curs_write(&conn->local_tx_ctrl.prod,
...@@ -410,6 +422,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn) ...@@ -410,6 +422,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
*/ */
int smc_tx_sndbuf_nonempty(struct smc_connection *conn) int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
{ {
struct smc_cdc_producer_flags *pflags;
struct smc_cdc_tx_pend *pend; struct smc_cdc_tx_pend *pend;
struct smc_wr_buf *wr_buf; struct smc_wr_buf *wr_buf;
int rc; int rc;
...@@ -433,14 +446,21 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn) ...@@ -433,14 +446,21 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
goto out_unlock; goto out_unlock;
} }
rc = smc_tx_rdma_writes(conn); if (!conn->local_tx_ctrl.prod_flags.urg_data_present) {
if (rc) { rc = smc_tx_rdma_writes(conn);
smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], if (rc) {
(struct smc_wr_tx_pend_priv *)pend); smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
goto out_unlock; (struct smc_wr_tx_pend_priv *)pend);
goto out_unlock;
}
} }
rc = smc_cdc_msg_send(conn, wr_buf, pend); rc = smc_cdc_msg_send(conn, wr_buf, pend);
pflags = &conn->local_tx_ctrl.prod_flags;
if (!rc && pflags->urg_data_present) {
pflags->urg_data_pending = 0;
pflags->urg_data_present = 0;
}
out_unlock: out_unlock:
spin_unlock_bh(&conn->send_lock); spin_unlock_bh(&conn->send_lock);
...@@ -473,7 +493,7 @@ void smc_tx_work(struct work_struct *work) ...@@ -473,7 +493,7 @@ void smc_tx_work(struct work_struct *work)
release_sock(&smc->sk); release_sock(&smc->sk);
} }
void smc_tx_consumer_update(struct smc_connection *conn) void smc_tx_consumer_update(struct smc_connection *conn, bool force)
{ {
union smc_host_cursor cfed, cons; union smc_host_cursor cfed, cons;
int to_confirm; int to_confirm;
...@@ -487,6 +507,7 @@ void smc_tx_consumer_update(struct smc_connection *conn) ...@@ -487,6 +507,7 @@ void smc_tx_consumer_update(struct smc_connection *conn)
to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons); to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons);
if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
force ||
((to_confirm > conn->rmbe_update_limit) && ((to_confirm > conn->rmbe_update_limit) &&
((to_confirm > (conn->rmb_desc->len / 2)) || ((to_confirm > (conn->rmb_desc->len / 2)) ||
conn->local_rx_ctrl.prod_flags.write_blocked))) { conn->local_rx_ctrl.prod_flags.write_blocked))) {
......
...@@ -32,6 +32,6 @@ void smc_tx_init(struct smc_sock *smc); ...@@ -32,6 +32,6 @@ void smc_tx_init(struct smc_sock *smc);
int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len); int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len);
int smc_tx_sndbuf_nonempty(struct smc_connection *conn); int smc_tx_sndbuf_nonempty(struct smc_connection *conn);
void smc_tx_sndbuf_nonfull(struct smc_sock *smc); void smc_tx_sndbuf_nonfull(struct smc_sock *smc);
void smc_tx_consumer_update(struct smc_connection *conn); void smc_tx_consumer_update(struct smc_connection *conn, bool force);
#endif /* SMC_TX_H */ #endif /* SMC_TX_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment