Commit b34accde authored by Titouan Soulard's avatar Titouan Soulard

libcapulet: make send function generic

parent afab2478
......@@ -209,13 +209,13 @@ int main(int argc, char *argv[]) {
return -1;
}
result = capulet_rdma_ib_send_read(&rdma_ctx, &out_info_packet, out_mr, allocated_size);
result = capulet_rdma_ib_post_send(&rdma_ctx, IBV_WR_RDMA_READ, out_mr, allocated_size, &out_info_packet);
if(!result) {
fprintf(stderr, "Sending Read failed\n");
return -1;
}
result = capulet_rdma_ib_send_write(&rdma_ctx, &in_info_packet, in_mr, allocated_size);
result = capulet_rdma_ib_post_send(&rdma_ctx, IBV_WR_RDMA_WRITE, in_mr, allocated_size, &in_info_packet);
if(!result) {
fprintf(stderr, "Sending Write failed\n");
return -1;
......
......@@ -21,8 +21,7 @@ struct ibv_mr *capulet_rdma_ib_create_mr(struct CapuletRdmaIbContext *ib_ctx, si
bool capulet_rdma_ib_fill_base_udp(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpQpInfoPacket *own);
bool capulet_rdma_ib_set_peer_from_udp(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpContext *udp_ctx);
bool capulet_rdma_ib_post_recv(struct CapuletRdmaIbContext *ctx, struct ibv_mr *ibv_dev_mr, int mr_size);
bool capulet_rdma_ib_send_read(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpMrInfoPacket *remote_mr, struct ibv_mr *local_mr, int local_mr_size);
bool capulet_rdma_ib_send_write(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpMrInfoPacket *remote_mr, struct ibv_mr *local_mr, int local_mr_size);
bool capulet_rdma_ib_post_send(struct CapuletRdmaIbContext *ctx, enum ibv_wr_opcode opcode, struct ibv_mr *local_mr, int local_mr_size, struct CapuletNetUdpMrInfoPacket *remote_mr);
void capulet_rdma_ib_free_mr(struct ibv_mr *mr);
void capulet_rdma_ib_free(struct CapuletRdmaIbContext *ctx);
......@@ -211,7 +211,7 @@ bool capulet_rdma_ib_post_recv(struct CapuletRdmaIbContext *ctx, struct ibv_mr *
return ibv_dev_bad_wr == NULL;
}
bool capulet_rdma_ib_send_read(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpMrInfoPacket *remote_mr, struct ibv_mr *local_mr, int local_mr_size) {
bool capulet_rdma_ib_post_send(struct CapuletRdmaIbContext *ctx, enum ibv_wr_opcode opcode, struct ibv_mr *local_mr, int local_mr_size, struct CapuletNetUdpMrInfoPacket *remote_mr) {
struct ibv_send_wr ibv_dev_rdma_wr;
struct ibv_sge ibv_dev_sge;
struct ibv_send_wr *ibv_dev_bad_wr = NULL;
......@@ -228,45 +228,14 @@ bool capulet_rdma_ib_send_read(struct CapuletRdmaIbContext *ctx, struct CapuletN
ibv_dev_rdma_wr.wr_id = (uint64_t) lrand48();
ibv_dev_rdma_wr.sg_list = &ibv_dev_sge;
ibv_dev_rdma_wr.num_sge = 1;
ibv_dev_rdma_wr.opcode = IBV_WR_RDMA_READ;
ibv_dev_rdma_wr.opcode = opcode;
ibv_dev_rdma_wr.send_flags = IBV_SEND_SIGNALED;
ibv_dev_rdma_wr.wr.rdma.remote_addr = (uintptr_t) remote_mr->addr;
ibv_dev_rdma_wr.wr.rdma.rkey = remote_mr->key;
result = ibv_post_send(ctx->qp, &ibv_dev_rdma_wr, &ibv_dev_bad_wr);
if(result) {
perror("Could not post Send request");
return false;
if(opcode == IBV_WR_RDMA_READ || opcode == IBV_WR_RDMA_WRITE) {
ibv_dev_rdma_wr.wr.rdma.remote_addr = (uintptr_t) remote_mr->addr;
ibv_dev_rdma_wr.wr.rdma.rkey = remote_mr->key;
}
return ibv_dev_bad_wr == NULL;
}
bool capulet_rdma_ib_send_write(struct CapuletRdmaIbContext *ctx, struct CapuletNetUdpMrInfoPacket *remote_mr, struct ibv_mr *local_mr, int local_mr_size) {
struct ibv_send_wr ibv_dev_rdma_wr;
struct ibv_sge ibv_dev_sge;
struct ibv_send_wr *ibv_dev_bad_wr = NULL;
int result;
memset(&ibv_dev_sge, 0, sizeof(struct ibv_sge));
memset(&ibv_dev_rdma_wr, 0, sizeof(struct ibv_send_wr));
ibv_dev_sge.addr = (uintptr_t) local_mr->addr;
ibv_dev_sge.length = local_mr_size;
ibv_dev_sge.lkey = local_mr->lkey;
ibv_dev_rdma_wr.wr_id = (uint64_t) lrand48();
ibv_dev_rdma_wr.sg_list = &ibv_dev_sge;
ibv_dev_rdma_wr.num_sge = 1;
ibv_dev_rdma_wr.opcode = IBV_WR_RDMA_WRITE;
ibv_dev_rdma_wr.send_flags = IBV_SEND_SIGNALED;
ibv_dev_rdma_wr.wr.rdma.remote_addr = (uintptr_t) remote_mr->addr;
ibv_dev_rdma_wr.wr.rdma.rkey = remote_mr->key;
result = ibv_post_send(ctx->qp, &ibv_dev_rdma_wr, &ibv_dev_bad_wr);
if(result) {
......
......@@ -12,7 +12,9 @@ void *serve_mr_td_fn(void *raw_ctx) {
int trx_rdma_start(TRXState *s, const TRXDriverParams2 *p) {
struct SDRContext *sdr_context;
struct SDRServeMrThreadContext serve_mr_td_ctx;
// XXX: to be freed!
struct SDRServeMrThreadContext *serve_mr_td_ctx;
serve_mr_td_ctx = malloc(sizeof(struct SDRServeMrThreadContext));
struct ibv_mr *in_mr;
struct ibv_mr *out_mr;
......@@ -90,16 +92,18 @@ int trx_rdma_start(TRXState *s, const TRXDriverParams2 *p) {
if(!result || (strcmp(sdr_context->in_remote.name, "in") != 0)) return -1;
capulet_net_udp_query_mr(server_socket, server_infos, "out", &sdr_context->out_remote);
if(!result || (strcmp(sdr_context->out_remote.name, "out") != 0)) return -1;
freeaddrinfo(server_infos);
} else {
// XXX: check ownership
serve_mr_td_ctx.mr_mgr = &sdr_context->mr_mgr;
serve_mr_td_ctx.server_socket = server_socket;
// Ownership check: `mr_mgr` will be destroyed in end function
// after thread, as well as `serve_mr_td_ctx` and server socket
// is fully transfered to thread.
serve_mr_td_ctx->mr_mgr = &sdr_context->mr_mgr;
serve_mr_td_ctx->server_socket = server_socket;
pthread_create(&serve_mr_td, NULL, *serve_mr_td_fn, (void *) &serve_mr_td_ctx);
capulet_net_udp_serve_mr(&sdr_context->mr_mgr, server_socket);
pthread_create(&serve_mr_td, NULL, *serve_mr_td_fn, (void *) serve_mr_td_ctx);
}
freeaddrinfo(server_infos);
return 0;
}
......@@ -123,7 +127,7 @@ void trx_rdma_write(TRXState *s, trx_timestamp_t timestamp, const void **samples
// If client, send local RDMA buffer to server
if(sdr_context->server_addr) {
capulet_rdma_ib_send_write(&sdr_context->ib_ctx, &sdr_context->in_remote, out_local_mr, byte_count + sizeof(struct SDRMetadata));
capulet_rdma_ib_post_send(&sdr_context->ib_ctx, IBV_WR_RDMA_WRITE, out_local_mr, byte_count + sizeof(struct SDRMetadata), &sdr_context->in_remote);
}
}
......@@ -146,7 +150,7 @@ int trx_rdma_read(TRXState *s, trx_timestamp_t *ptimestamp, void **psamples, int
// If client, send Read request and poll for completion
if(sdr_context->server_addr) {
capulet_rdma_ib_send_read(&sdr_context->ib_ctx, &sdr_context->out_remote, in_local_mr, byte_count + sizeof(struct SDRMetadata));
capulet_rdma_ib_post_send(&sdr_context->ib_ctx, IBV_WR_RDMA_READ, in_local_mr, byte_count + sizeof(struct SDRMetadata), &sdr_context->out_remote);
do {
result = ibv_poll_cq(sdr_context->ib_ctx.cq, 1, &poll_wc);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment