Commit ff3d7d36 authored by Andy Grover's avatar Andy Grover

RDS: Perform unmapping ops in stages

Previously, RDS would wait until the final send WR had completed
and then handle cleanup. With silent ops, we do not know
if an atomic, rdma, or data op will be last. This patch
handles any of these cases by keeping a pointer to the last
op in the message in m_last_op.

When the TX completion event fires, rds dispatches to per-op-type
cleanup functions, and then does whole-message cleanup, if the
last op equalled m_last_op.

This patch also moves towards having op-specific functions take
the op struct, instead of the overall rm struct.

rds_ib_connection has a pointer to keep track of a a partially-
completed data send operation. This patch changes it from an
rds_message pointer to the narrower rm_data_op pointer, and
modifies places that use this pointer as needed.
Signed-off-by: default avatarAndy Grover <andy.grover@oracle.com>
parent aa0a4ef4
...@@ -53,8 +53,7 @@ struct rds_ib_connect_private { ...@@ -53,8 +53,7 @@ struct rds_ib_connect_private {
}; };
struct rds_ib_send_work { struct rds_ib_send_work {
struct rds_message *s_rm; void *s_op;
struct rm_rdma_op *s_op;
struct ib_send_wr s_wr; struct ib_send_wr s_wr;
struct ib_sge s_sge[RDS_IB_MAX_SGE]; struct ib_sge s_sge[RDS_IB_MAX_SGE];
unsigned long s_queued; unsigned long s_queued;
...@@ -92,7 +91,7 @@ struct rds_ib_connection { ...@@ -92,7 +91,7 @@ struct rds_ib_connection {
/* tx */ /* tx */
struct rds_ib_work_ring i_send_ring; struct rds_ib_work_ring i_send_ring;
struct rds_message *i_rm; struct rm_data_op *i_data_op;
struct rds_header *i_send_hdrs; struct rds_header *i_send_hdrs;
u64 i_send_hdrs_dma; u64 i_send_hdrs_dma;
struct rds_ib_send_work *i_sends; struct rds_ib_send_work *i_sends;
...@@ -336,7 +335,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits); ...@@ -336,7 +335,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted); void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted, int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
u32 *adv_credits, int need_posted, int max_posted); u32 *adv_credits, int need_posted, int max_posted);
int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm); int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);
/* ib_stats.c */ /* ib_stats.c */
DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
......
...@@ -673,9 +673,12 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) ...@@ -673,9 +673,12 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
BUG_ON(ic->rds_ibdev); BUG_ON(ic->rds_ibdev);
/* Clear pending transmit */ /* Clear pending transmit */
if (ic->i_rm) { if (ic->i_data_op) {
rds_message_put(ic->i_rm); struct rds_message *rm;
ic->i_rm = NULL;
rm = container_of(ic->i_data_op, struct rds_message, data);
rds_message_put(rm);
ic->i_data_op = NULL;
} }
/* Clear the ACK state */ /* Clear the ACK state */
......
...@@ -67,80 +67,122 @@ static void rds_ib_send_complete(struct rds_message *rm, ...@@ -67,80 +67,122 @@ static void rds_ib_send_complete(struct rds_message *rm,
complete(rm, notify_status); complete(rm, notify_status);
} }
static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
struct rds_ib_send_work *send, struct rm_data_op *op,
int wc_status) int wc_status)
{ {
struct rds_message *rm = send->s_rm; if (op->op_nents)
ib_dma_unmap_sg(ic->i_cm_id->device,
rdsdebug("ic %p send %p rm %p\n", ic, send, rm); op->op_sg, op->op_nents,
DMA_TO_DEVICE);
ib_dma_unmap_sg(ic->i_cm_id->device, }
rm->data.op_sg, rm->data.op_nents,
DMA_TO_DEVICE);
if (rm->rdma.op_active) { static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
struct rm_rdma_op *op = &rm->rdma; struct rm_rdma_op *op,
int wc_status)
{
if (op->op_mapped) {
ib_dma_unmap_sg(ic->i_cm_id->device,
op->op_sg, op->op_nents,
op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
op->op_mapped = 0;
}
if (op->op_mapped) { /* If the user asked for a completion notification on this
ib_dma_unmap_sg(ic->i_cm_id->device, * message, we can implement three different semantics:
op->op_sg, op->op_nents, * 1. Notify when we received the ACK on the RDS message
op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); * that was queued with the RDMA. This provides reliable
op->op_mapped = 0; * notification of RDMA status at the expense of a one-way
} * packet delay.
* 2. Notify when the IB stack gives us the completion event for
* the RDMA operation.
* 3. Notify when the IB stack gives us the completion event for
* the accompanying RDS messages.
* Here, we implement approach #3. To implement approach #2,
* we would need to take an event for the rdma WR. To implement #1,
* don't call rds_rdma_send_complete at all, and fall back to the notify
* handling in the ACK processing code.
*
* Note: There's no need to explicitly sync any RDMA buffers using
* ib_dma_sync_sg_for_cpu - the completion for the RDMA
* operation itself unmapped the RDMA buffers, which takes care
* of synching.
*/
rds_ib_send_complete(container_of(op, struct rds_message, rdma),
wc_status, rds_rdma_send_complete);
/* If the user asked for a completion notification on this if (op->op_write)
* message, we can implement three different semantics: rds_stats_add(s_send_rdma_bytes, op->op_bytes);
* 1. Notify when we received the ACK on the RDS message else
* that was queued with the RDMA. This provides reliable rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
* notification of RDMA status at the expense of a one-way }
* packet delay.
* 2. Notify when the IB stack gives us the completion event for
* the RDMA operation.
* 3. Notify when the IB stack gives us the completion event for
* the accompanying RDS messages.
* Here, we implement approach #3. To implement approach #2,
* call rds_rdma_send_complete from the cq_handler. To implement #1,
* don't call rds_rdma_send_complete at all, and fall back to the notify
* handling in the ACK processing code.
*
* Note: There's no need to explicitly sync any RDMA buffers using
* ib_dma_sync_sg_for_cpu - the completion for the RDMA
* operation itself unmapped the RDMA buffers, which takes care
* of synching.
*/
rds_ib_send_complete(rm, wc_status, rds_rdma_send_complete);
if (rm->rdma.op_write) static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes); struct rm_atomic_op *op,
else int wc_status)
rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes); {
/* unmap atomic recvbuf */
if (op->op_mapped) {
ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
DMA_FROM_DEVICE);
op->op_mapped = 0;
} }
if (rm->atomic.op_active) { rds_ib_send_complete(container_of(op, struct rds_message, atomic),
struct rm_atomic_op *op = &rm->atomic; wc_status, rds_atomic_send_complete);
/* unmap atomic recvbuf */
if (op->op_mapped) {
ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
DMA_FROM_DEVICE);
op->op_mapped = 0;
}
rds_ib_send_complete(rm, wc_status, rds_atomic_send_complete); if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
rds_stats_inc(s_atomic_cswp);
else
rds_stats_inc(s_atomic_fadd);
}
if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP) /*
rds_stats_inc(s_atomic_cswp); * Unmap the resources associated with a struct send_work.
else *
rds_stats_inc(s_atomic_fadd); * Returns the rm for no good reason other than it is unobtainable
* other than by switching on wr.opcode, currently, and the caller,
* the event handler, needs it.
*/
static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
struct rds_ib_send_work *send,
int wc_status)
{
struct rds_message *rm = NULL;
/* In the error case, wc.opcode sometimes contains garbage */
switch (send->s_wr.opcode) {
case IB_WR_SEND:
if (send->s_op) {
rm = container_of(send->s_op, struct rds_message, data);
rds_ib_send_unmap_data(ic, send->s_op, wc_status);
}
break;
case IB_WR_RDMA_WRITE:
case IB_WR_RDMA_READ:
if (send->s_op) {
rm = container_of(send->s_op, struct rds_message, rdma);
rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
}
break;
case IB_WR_ATOMIC_FETCH_AND_ADD:
case IB_WR_ATOMIC_CMP_AND_SWP:
if (send->s_op) {
rm = container_of(send->s_op, struct rds_message, atomic);
rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
}
break;
default:
if (printk_ratelimit())
printk(KERN_NOTICE
"RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
__func__, send->s_wr.opcode);
break;
} }
/* If anyone waited for this message to get flushed out, wake send->s_wr.opcode = 0xdead;
* them up now */
rds_message_unmapped(rm);
rds_message_put(rm); return rm;
send->s_rm = NULL;
} }
void rds_ib_send_init_ring(struct rds_ib_connection *ic) void rds_ib_send_init_ring(struct rds_ib_connection *ic)
...@@ -151,7 +193,6 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic) ...@@ -151,7 +193,6 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
struct ib_sge *sge; struct ib_sge *sge;
send->s_rm = NULL;
send->s_op = NULL; send->s_op = NULL;
send->s_wr.wr_id = i; send->s_wr.wr_id = i;
...@@ -173,9 +214,8 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic) ...@@ -173,9 +214,8 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
u32 i; u32 i;
for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
if (!send->s_rm || send->s_wr.opcode == 0xdead) if (send->s_op && send->s_wr.opcode != 0xdead)
continue; rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
} }
} }
...@@ -189,6 +229,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) ...@@ -189,6 +229,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
{ {
struct rds_connection *conn = context; struct rds_connection *conn = context;
struct rds_ib_connection *ic = conn->c_transport_data; struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_message *rm = NULL;
struct ib_wc wc; struct ib_wc wc;
struct rds_ib_send_work *send; struct rds_ib_send_work *send;
u32 completed; u32 completed;
...@@ -222,42 +263,18 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) ...@@ -222,42 +263,18 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
for (i = 0; i < completed; i++) { for (i = 0; i < completed; i++) {
send = &ic->i_sends[oldest]; send = &ic->i_sends[oldest];
/* In the error case, wc.opcode sometimes contains garbage */ rm = rds_ib_send_unmap_op(ic, send, wc.status);
switch (send->s_wr.opcode) {
case IB_WR_SEND:
case IB_WR_RDMA_WRITE:
case IB_WR_RDMA_READ:
case IB_WR_ATOMIC_FETCH_AND_ADD:
case IB_WR_ATOMIC_CMP_AND_SWP:
if (send->s_rm)
rds_ib_send_unmap_rm(ic, send, wc.status);
break;
default:
if (printk_ratelimit())
printk(KERN_NOTICE
"RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
__func__, send->s_wr.opcode);
break;
}
send->s_wr.opcode = 0xdead;
send->s_wr.num_sge = 1;
if (send->s_queued + HZ/2 < jiffies) if (send->s_queued + HZ/2 < jiffies)
rds_ib_stats_inc(s_ib_tx_stalled); rds_ib_stats_inc(s_ib_tx_stalled);
/* If a RDMA operation produced an error, signal this right if (&send->s_op == &rm->m_final_op) {
* away. If we don't, the subsequent SEND that goes with this /* If anyone waited for this message to get flushed out, wake
* RDMA will be canceled with ERR_WFLUSH, and the application * them up now */
* never learn that the RDMA failed. */ rds_message_unmapped(rm);
if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) {
struct rds_message *rm; rds_message_put(rm);
send->s_op = NULL;
rm = rds_send_get_message(conn, send->s_op);
if (rm) {
rds_ib_send_unmap_rm(ic, send, wc.status);
rds_ib_send_complete(rm, wc.status, rds_rdma_send_complete);
rds_message_put(rm);
}
} }
oldest = (oldest + 1) % ic->i_send_ring.w_nr; oldest = (oldest + 1) % ic->i_send_ring.w_nr;
...@@ -512,7 +529,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -512,7 +529,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
} }
/* map the message the first time we see it */ /* map the message the first time we see it */
if (!ic->i_rm) { if (!ic->i_data_op) {
if (rm->data.op_nents) { if (rm->data.op_nents) {
rm->data.op_count = ib_dma_map_sg(dev, rm->data.op_count = ib_dma_map_sg(dev,
rm->data.op_sg, rm->data.op_sg,
...@@ -530,7 +547,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -530,7 +547,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
} }
rds_message_addref(rm); rds_message_addref(rm);
ic->i_rm = rm; ic->i_data_op = &rm->data;
/* Finalize the header */ /* Finalize the header */
if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)) if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
...@@ -583,7 +600,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -583,7 +600,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
send = &ic->i_sends[pos]; send = &ic->i_sends[pos];
first = send; first = send;
prev = NULL; prev = NULL;
scat = &rm->data.op_sg[sg]; scat = &ic->i_data_op->op_sg[sg];
i = 0; i = 0;
do { do {
unsigned int len = 0; unsigned int len = 0;
...@@ -658,9 +675,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -658,9 +675,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
/* if we finished the message then send completion owns it */ /* if we finished the message then send completion owns it */
if (scat == &rm->data.op_sg[rm->data.op_count]) { if (scat == &rm->data.op_sg[rm->data.op_count]) {
prev->s_rm = ic->i_rm; prev->s_op = ic->i_data_op;
prev->s_wr.send_flags |= IB_SEND_SOLICITED; prev->s_wr.send_flags |= IB_SEND_SOLICITED;
ic->i_rm = NULL; ic->i_data_op = NULL;
} }
/* Put back wrs & credits we didn't use */ /* Put back wrs & credits we didn't use */
...@@ -681,9 +698,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -681,9 +698,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret); "returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
if (prev->s_rm) { if (prev->s_op) {
ic->i_rm = prev->s_rm; ic->i_data_op = prev->s_op;
prev->s_rm = NULL; prev->s_op = NULL;
} }
rds_ib_conn_error(ic->conn, "ib_post_send failed\n"); rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
...@@ -701,10 +718,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, ...@@ -701,10 +718,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
* A simplified version of the rdma case, we always map 1 SG, and * A simplified version of the rdma case, we always map 1 SG, and
* only 8 bytes, for the return value from the atomic operation. * only 8 bytes, for the return value from the atomic operation.
*/ */
int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm) int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
{ {
struct rds_ib_connection *ic = conn->c_transport_data; struct rds_ib_connection *ic = conn->c_transport_data;
struct rm_atomic_op *op = &rm->atomic;
struct rds_ib_send_work *send = NULL; struct rds_ib_send_work *send = NULL;
struct ib_send_wr *failed_wr; struct ib_send_wr *failed_wr;
struct rds_ib_device *rds_ibdev; struct rds_ib_device *rds_ibdev;
...@@ -741,14 +757,6 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm) ...@@ -741,14 +757,6 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
send->s_wr.wr.atomic.rkey = op->op_rkey; send->s_wr.wr.atomic.rkey = op->op_rkey;
/*
* If there is no data or rdma ops in the message, then
* we must fill in s_rm ourselves, so we properly clean up
* on completion.
*/
if (!rm->rdma.op_active && !rm->data.op_active)
send->s_rm = rm;
/* map 8 byte retval buffer to the device */ /* map 8 byte retval buffer to the device */
ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
...@@ -809,7 +817,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) ...@@ -809,7 +817,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
/* map the message the first time we see it */ /* map the op the first time we see it */
if (!op->op_mapped) { if (!op->op_mapped) {
op->op_count = ib_dma_map_sg(ic->i_cm_id->device, op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
op->op_sg, op->op_nents, (op->op_write) ? op->op_sg, op->op_nents, (op->op_write) ?
......
...@@ -308,6 +308,8 @@ struct rds_message { ...@@ -308,6 +308,8 @@ struct rds_message {
unsigned int m_used_sgs; unsigned int m_used_sgs;
unsigned int m_total_sgs; unsigned int m_total_sgs;
void *m_final_op;
struct { struct {
struct rm_atomic_op { struct rm_atomic_op {
int op_type; int op_type;
...@@ -421,7 +423,7 @@ struct rds_transport { ...@@ -421,7 +423,7 @@ struct rds_transport {
int (*xmit_cong_map)(struct rds_connection *conn, int (*xmit_cong_map)(struct rds_connection *conn,
struct rds_cong_map *map, unsigned long offset); struct rds_cong_map *map, unsigned long offset);
int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
int (*xmit_atomic)(struct rds_connection *conn, struct rds_message *rm); int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op);
int (*recv)(struct rds_connection *conn); int (*recv)(struct rds_connection *conn);
int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov, int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
size_t size); size_t size);
......
...@@ -252,6 +252,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -252,6 +252,7 @@ int rds_send_xmit(struct rds_connection *conn)
/* The transport either sends the whole rdma or none of it */ /* The transport either sends the whole rdma or none of it */
if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
rm->m_final_op = &rm->rdma;
ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
if (ret) if (ret)
break; break;
...@@ -263,10 +264,12 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -263,10 +264,12 @@ int rds_send_xmit(struct rds_connection *conn)
} }
if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
ret = conn->c_trans->xmit_atomic(conn, rm); rm->m_final_op = &rm->atomic;
ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
if (ret) if (ret)
break; break;
conn->c_xmit_atomic_sent = 1; conn->c_xmit_atomic_sent = 1;
/* The transport owns the mapped memory for now. /* The transport owns the mapped memory for now.
* You can't unmap it while it's on the send queue */ * You can't unmap it while it's on the send queue */
set_bit(RDS_MSG_MAPPED, &rm->m_flags); set_bit(RDS_MSG_MAPPED, &rm->m_flags);
...@@ -295,6 +298,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -295,6 +298,7 @@ int rds_send_xmit(struct rds_connection *conn)
} }
if (rm->data.op_active && !conn->c_xmit_data_sent) { if (rm->data.op_active && !conn->c_xmit_data_sent) {
rm->m_final_op = &rm->data;
ret = conn->c_trans->xmit(conn, rm, ret = conn->c_trans->xmit(conn, rm,
conn->c_xmit_hdr_off, conn->c_xmit_hdr_off,
conn->c_xmit_sg, conn->c_xmit_sg,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment