Commit 941f8d55 authored by Santosh Shilimkar's avatar Santosh Shilimkar

RDS: RDMA: Fix the composite message user notification

When application sends an RDS RDMA composite message consist of
RDMA transfer to be followed up by non RDMA payload, it expect to
be notified *only* when the full message gets delivered. RDS RDMA
notification doesn't behave this way though.

Thanks to Venkat for debug and root casuing the issue
where only first part of the message(RDMA) was
successfully delivered but remainder payload delivery failed.
In that case, application should not be notified with
a false positive of message delivery success.

Fix this case by making sure the user gets notified only after
the full message delivery.
Reviewed-by: default avatarVenkat Venkatsubra <venkat.x.venkatsubra@oracle.com>
Signed-off-by: default avatarSantosh Shilimkar <santosh.shilimkar@oracle.com>
parent be2f76ea
...@@ -69,16 +69,6 @@ static void rds_ib_send_complete(struct rds_message *rm, ...@@ -69,16 +69,6 @@ static void rds_ib_send_complete(struct rds_message *rm,
complete(rm, notify_status); complete(rm, notify_status);
} }
static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
struct rm_data_op *op,
int wc_status)
{
if (op->op_nents)
ib_dma_unmap_sg(ic->i_cm_id->device,
op->op_sg, op->op_nents,
DMA_TO_DEVICE);
}
static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
struct rm_rdma_op *op, struct rm_rdma_op *op,
int wc_status) int wc_status)
...@@ -139,6 +129,21 @@ static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic, ...@@ -139,6 +129,21 @@ static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
rds_ib_stats_inc(s_ib_atomic_fadd); rds_ib_stats_inc(s_ib_atomic_fadd);
} }
static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
struct rm_data_op *op,
int wc_status)
{
struct rds_message *rm = container_of(op, struct rds_message, data);
if (op->op_nents)
ib_dma_unmap_sg(ic->i_cm_id->device,
op->op_sg, op->op_nents,
DMA_TO_DEVICE);
if (rm->rdma.op_active && rm->data.op_notify)
rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status);
}
/* /*
* Unmap the resources associated with a struct send_work. * Unmap the resources associated with a struct send_work.
* *
......
...@@ -627,6 +627,16 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm, ...@@ -627,6 +627,16 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
} }
op->op_notifier->n_user_token = args->user_token; op->op_notifier->n_user_token = args->user_token;
op->op_notifier->n_status = RDS_RDMA_SUCCESS; op->op_notifier->n_status = RDS_RDMA_SUCCESS;
/* Enable rmda notification on data operation for composite
* rds messages and make sure notification is enabled only
* for the data operation which follows it so that application
* gets notified only after full message gets delivered.
*/
if (rm->data.op_sg) {
rm->rdma.op_notify = 0;
rm->data.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
}
} }
/* The cookie contains the R_Key of the remote memory region, and /* The cookie contains the R_Key of the remote memory region, and
......
...@@ -419,6 +419,7 @@ struct rds_message { ...@@ -419,6 +419,7 @@ struct rds_message {
} rdma; } rdma;
struct rm_data_op { struct rm_data_op {
unsigned int op_active:1; unsigned int op_active:1;
unsigned int op_notify:1;
unsigned int op_nents; unsigned int op_nents;
unsigned int op_count; unsigned int op_count;
unsigned int op_dmasg; unsigned int op_dmasg;
......
...@@ -476,12 +476,14 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) ...@@ -476,12 +476,14 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
struct rm_rdma_op *ro; struct rm_rdma_op *ro;
struct rds_notifier *notifier; struct rds_notifier *notifier;
unsigned long flags; unsigned long flags;
unsigned int notify = 0;
spin_lock_irqsave(&rm->m_rs_lock, flags); spin_lock_irqsave(&rm->m_rs_lock, flags);
notify = rm->rdma.op_notify | rm->data.op_notify;
ro = &rm->rdma; ro = &rm->rdma;
if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
ro->op_active && ro->op_notify && ro->op_notifier) { ro->op_active && notify && ro->op_notifier) {
notifier = ro->op_notifier; notifier = ro->op_notifier;
rs = rm->m_rs; rs = rm->m_rs;
sock_hold(rds_rs_to_sk(rs)); sock_hold(rds_rs_to_sk(rs));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment