Commit 610210c8 authored by Joanne Hugé's avatar Joanne Hugé

Update trx driver with DPDK

parent 48754dba
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2010-2014 Intel Corporation
CC=gcc -m64 -msse4.1 CC=gcc -m64 -msse4.1
CXX=g++ -m64 -msse4.1 CXX=g++ -m64 -msse4.1
CFLAGS=-O2 -fno-strict-aliasing CFLAGS=-O2 -fno-strict-aliasing -Wall -pedantic -std=gnu17
CFLAGS+=-D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE CFLAGS+=-D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE
CFLAGS+=-MMD -g CFLAGS+=-MMD -g
CXXFLAGS=$(CFLAGS) CXXFLAGS=$(CFLAGS)
PROGS= trx_ecpri.so # binary name
APP = trx_ecpri_dpdk
LIB = lib$(APP).so
all: $(PROGS) # all source are stored in SRCS-y
SRCS-y := $(APP).c
clean: PKGCONF ?= pkg-config
rm -f $(PROGS) *.lo *~ *.d
# Build using pkg-config variables if possible
ifneq ($(shell $(PKGCONF) --exists libdpdk && echo 0),0)
$(error "no installation of DPDK found")
endif
all: $(LIB)
$(LIB): $(APP).o
$(CC) -shared $(LDFLAGS) $(LDFLAGS_SHARED) -o $@ $<
trx_ecpri.so: trx_ecpri.lo .PHONY: shared
$(CC) -shared $(LDFLAGS) -o $@ $< -lm shared: build/$(APP)-shared
ln -sf $(APP)-shared build/$(APP)
%.lo: %.c PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null)
$(CC) $(CFLAGS) -fpic -c -o $@ $< CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk)
LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk) -lpthread -lm
-include $(wildcard *.d) CFLAGS += -DALLOW_EXPERIMENTAL_API
build/$(APP)-shared: $(SRCS-y) Makefile $(PC_FILE) | build
$(CC) $(CFLAGS) $(SRCS-y) -o $@ $(LDFLAGS) $(LDFLAGS_SHARED)
%.o: %.c
$(CC) $(CFLAGS) $(SRCS-y) -fpic -c -o $@
build:
@mkdir -p $@
.PHONY: clean
clean:
rm -rf build
rm -rf $(LIB)
rm -f *.o *.d *.so
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2010-2014 Intel Corporation # Copyright(c) 2010-2014 Intel Corporation
CC=gcc -m64 -msse4.1
CXX=g++ -m64 -msse4.1
CFLAGS=-O2 -fno-strict-aliasing
CFLAGS+=-D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE
CFLAGS+=-MMD -g
CXXFLAGS=$(CFLAGS)
# binary name # binary name
APP = dpdk-recv-send APP = dpdk-recv-send
...@@ -15,29 +22,29 @@ $(error "no installation of DPDK found") ...@@ -15,29 +22,29 @@ $(error "no installation of DPDK found")
endif endif
all: shared all: shared
.PHONY: shared static .PHONY: shared
shared: build/$(APP)-shared shared: build/$(APP)-shared
ln -sf $(APP)-shared build/$(APP) ln -sf $(APP)-shared build/$(APP)
static: build/$(APP)-static
ln -sf $(APP)-static build/$(APP) dpdk-recv-send.so: dpdk-recv-send.o
$(CC) -shared $(LDFLAGS) $(LDFLAGS_SHARED) -o $@ $<
PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null) PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null)
CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk) CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk)
LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk) -lpthread LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk) -lpthread -lm
LDFLAGS_STATIC = $(shell $(PKGCONF) --static --libs libdpdk)
CFLAGS += -DALLOW_EXPERIMENTAL_API CFLAGS += -DALLOW_EXPERIMENTAL_API
build/$(APP)-shared: $(SRCS-y) Makefile $(PC_FILE) | build build/$(APP)-shared: $(SRCS-y) Makefile $(PC_FILE) | build
$(CC) $(CFLAGS) $(SRCS-y) -o $@ $(LDFLAGS) $(LDFLAGS_SHARED) $(CC) $(CFLAGS) $(SRCS-y) -o $@ $(LDFLAGS) $(LDFLAGS_SHARED)
build/$(APP)-static: $(SRCS-y) Makefile $(PC_FILE) | build %.o: %.c
$(CC) $(CFLAGS) $(SRCS-y) -o $@ $(LDFLAGS) $(LDFLAGS_STATIC) $(CC) $(CFLAGS) $(SRCS-y) -fpic -c -o $@
build: build:
@mkdir -p $@ @mkdir -p $@
.PHONY: clean .PHONY: clean
clean: clean:
rm -f build/$(APP) build/$(APP)-static build/$(APP)-shared rm -rf build
test -d build && rmdir -p build || true rm -f *.o *.d *.so
#define _GNU_SOURCE /* See feature_test_macros(7) */ //#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <arpa/inet.h> #include <arpa/inet.h>
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
...@@ -98,10 +98,9 @@ static int64_t calcdiff_ns(struct timespec t1, struct timespec t2) { ...@@ -98,10 +98,9 @@ static int64_t calcdiff_ns(struct timespec t1, struct timespec t2) {
return diff; return diff;
} }
#define DATA_SIZE 244
#define BURST_SIZE 16 #define BURST_SIZE 16
//#define NB_PACKETS (1 * 1000000)
#define NB_PACKETS 64
#define DATA_LEN 244
#define MEMPOOL_CACHE_SIZE 256 #define MEMPOOL_CACHE_SIZE 256
#define RTE_TEST_RX_DESC_DEFAULT 1024 #define RTE_TEST_RX_DESC_DEFAULT 1024
#define RTE_TEST_TX_DESC_DEFAULT 1024 #define RTE_TEST_TX_DESC_DEFAULT 1024
...@@ -110,8 +109,7 @@ static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT; ...@@ -110,8 +109,7 @@ static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT;
struct rte_mempool *mbuf_pool; struct rte_mempool *mbuf_pool;
struct rte_ether_addr s_addr = {{0xb8,0xce,0xf6,0x4b,0x00,0x22}}; struct rte_ether_addr s_addr = {{0xb8,0xce,0xf6,0x4b,0x00,0x22}};
struct rte_ether_addr d_addr = {{0xb8,0xce,0xf6,0x4b,0x00,0x23}}; struct rte_ether_addr d_addr = {{0xb8,0xce,0xf6,0x4b,0x00,0x23}};
uint8_t data[BURST_SIZE][DATA_LEN]; int8_t data[BURST_SIZE][DATA_SIZE];
static const struct rte_eth_conf port_conf_default = { static const struct rte_eth_conf port_conf_default = {
.rxmode = { .max_rx_pkt_len = RTE_ETHER_MAX_LEN } .rxmode = { .max_rx_pkt_len = RTE_ETHER_MAX_LEN }
}; };
...@@ -126,10 +124,6 @@ static inline int port_init(int portid, struct rte_mempool *mbuf_pool) { ...@@ -126,10 +124,6 @@ static inline int port_init(int portid, struct rte_mempool *mbuf_pool) {
if (retval != 0) if (retval != 0)
return retval; return retval;
//ret = rte_eth_dev_adjust_nb_rx_tx_desc(1, &nb_rxd, &nb_txd);
//if (ret < 0)
// rte_exit(EXIT_FAILURE, "Cannot adjust number of descriptors: err=%d, port=%u\n", ret, 1);
/* Allocate and set up 1 RX queue per Ethernet port. */ /* Allocate and set up 1 RX queue per Ethernet port. */
for (q = 0; q < rx_rings; q++) { for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(portid, q, nb_rxd, retval = rte_eth_rx_queue_setup(portid, q, nb_rxd,
...@@ -176,7 +170,6 @@ static void init_dpdk(int argc, char ** argv) { ...@@ -176,7 +170,6 @@ static void init_dpdk(int argc, char ** argv) {
if (port_init(1, mbuf_pool) != 0) if (port_init(1, mbuf_pool) != 0)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", 1); rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", 1);
} }
static void send_packets(int port) { static void send_packets(int port) {
struct rte_mbuf * pkt[BURST_SIZE]; struct rte_mbuf * pkt[BURST_SIZE];
struct rte_ether_hdr *eth_hdr; struct rte_ether_hdr *eth_hdr;
...@@ -193,8 +186,8 @@ static void send_packets(int port) { ...@@ -193,8 +186,8 @@ static void send_packets(int port) {
eth_hdr->s_addr = s_addr; eth_hdr->s_addr = s_addr;
} }
eth_hdr->ether_type = 0x0a00; eth_hdr->ether_type = 0x0a00;
memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), data[i], DATA_LEN); memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), data[i], DATA_SIZE);
pkt_size = DATA_LEN + sizeof(struct rte_ether_hdr); pkt_size = DATA_SIZE + sizeof(struct rte_ether_hdr);
pkt[i]->data_len = pkt_size; pkt[i]->data_len = pkt_size;
pkt[i]->pkt_len = pkt_size; pkt[i]->pkt_len = pkt_size;
} }
...@@ -209,7 +202,6 @@ static void send_packets(int port) { ...@@ -209,7 +202,6 @@ static void send_packets(int port) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
static int recv_packets(int port) { static int recv_packets(int port) {
struct rte_mbuf * pkt[BURST_SIZE]; struct rte_mbuf * pkt[BURST_SIZE];
//if(port == 0) log_info("PORT0", "Init"); //if(port == 0) log_info("PORT0", "Init");
...@@ -244,7 +236,7 @@ void * send_thread(void * p) { ...@@ -244,7 +236,7 @@ void * send_thread(void * p) {
log_info("SEND_THREAD", "Init %d", *port); log_info("SEND_THREAD", "Init %d", *port);
for(int i = 0; i < BURST_SIZE; i++) { for(int i = 0; i < BURST_SIZE; i++) {
for(int j = 0; j < DATA_LEN; j++) { for(int j = 0; j < DATA_SIZE; j++) {
data[i][j] = 0xfe; data[i][j] = 0xfe;
} }
} }
......
...@@ -3,30 +3,28 @@ ...@@ -3,30 +3,28 @@
CC=gcc -m64 -msse4.1 CC=gcc -m64 -msse4.1
CXX=g++ -m64 -msse4.1 CXX=g++ -m64 -msse4.1
CFLAGS=-O2 -fno-strict-aliasing -Wall -pedantic #CFLAGS=-O2 -Wall -pedantic -std=gnu17
CFLAGS+=-D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE CFLAGS=-O2 -fno-strict-aliasing -Wall -pedantic -std=gnu17
CFLAGS+=-MMD -g CFLAGS +=-D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE
CFLAGS += -DALLOW_EXPERIMENTAL_API
CXXFLAGS=$(CFLAGS) CXXFLAGS=$(CFLAGS)
LIB=trx_ecpri_dpdk
LIBDIR=/root/ecpri/trx-ecpri-priv
LDFLAGS=-l$(LIB)
LDFLAGS+=-L/usr/local/lib/x86_64-linux-gnu -Wl,--as-needed -lrte_node -lrte_graph -lrte_bpf -lrte_flow_classify -lrte_pipeline -lrte_table -lrte_port -lrte_fib -lrte_ipsec -lrte_vhost -lrte_stack -lrte_security -lrte_sched -lrte_reorder -lrte_rib -lrte_regexdev -lrte_rawdev -lrte_pdump -lrte_power -lrte_member -lrte_lpm -lrte_latencystats -lrte_kni -lrte_jobstats -lrte_ip_frag -lrte_gso -lrte_gro -lrte_eventdev -lrte_efd -lrte_distributor -lrte_cryptodev -lrte_compressdev -lrte_cfgfile -lrte_bitratestats -lrte_bbdev -lrte_acl -lrte_timer -lrte_hash -lrte_metrics -lrte_cmdline -lrte_pci -lrte_ethdev -lrte_meter -lrte_net -lrte_mbuf -lrte_mempool -lrte_rcu -lrte_ring -lrte_eal -lrte_telemetry -lrte_kvargs -lbsd -lpthread -lm
SRCCLIENT=trx_ecpri.c client.c PROG=test-dpdk-ecpri
SRCSERVER=trx_ecpri.c server.c
OBJCLIENT=$(SRCCLIENT:%.c=%.o)
OBJSERVER=$(SRCSERVER:%.c=%.o)
CLIENT=client
SERVER=server
all: $(SERVER) $(CLIENT) all: $(PROG)
clean: $(PROG): $(PROG).o
rm -f $(SERVER) $(CLIENT) *.o *~ *.d $(CC) -o $@ $< $(LDFLAGS)
$(SERVER): $(OBJSERVER)
$(CC) $(LDFLAGS) -o $@ $^ -lm -lpthread
$(CLIENT): $(OBJCLIENT)
$(CC) $(LDFLAGS) -o $@ $^ -lm -lpthread
%.o: %.c %.o: %.c
$(CC) $(CFLAGS) -fpic -c -o $@ $< $(CC) $(CFLAGS) -c -o $@ $<
clean:
rm -f $(PROG) *.o *~ *.d
-include $(wildcard *.d) -include $(wildcard *.d)
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
typedef struct {
const char * re_mac;
const char * rec_mac;
const char * rec_if;
int recv_affinity;
int send_affinity;
int prepare_affinity;
int decompress_affinity;
int statistic_affinity;
int ecpri_period;
int flow_id;
int sample_rate;
} TRXEcpriState;
static void log_error(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " ERROR [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
exit(EXIT_FAILURE);
}
static void log_info(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " INFO [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
int startdpdk(TRXEcpriState * s);
int main(int argc, char * argv[]) {
(void) argc;
(void) argv;
TRXEcpriState *s;
s = malloc(sizeof(TRXEcpriState));
memset(s, 0, sizeof(*s));
#if 0
s->rec_mac = "80:fa:5b:92:39:c3";
s->re_mac = "00:e0:4c:90:20:d3";
s->rec_if = "enp53s0";
#else
s->rec_mac = "b8:ce:f6:4b:00:22";
s->re_mac = "b8:ce:f6:4b:00:23";
s->rec_if = "ens5f0np0";
#endif
s->recv_affinity = 39;
s->send_affinity = 38;
s->prepare_affinity = 37;
s->decompress_affinity = 36;
s->statistic_affinity = 35;
s->ecpri_period = 800;
s->flow_id = 0;
s->sample_rate = 122880000;
log_info("TEST-DPDK-ECPRI", "Starting test...\n");
log_info("TEST-DPDK-ECPRI", "rec-mac: %s, re-mac: %s, rec-if: %s", s->rec_mac, s->re_mac, s->rec_if);
startdpdk(s);
for(int i = 0; i < 1000; i++) {
sleep(1);
}
}
#!/bin/bash
cd ..;
make;
cd ecpri-tests;
make all;
./test-dpdk-ecpri
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_ether.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <rte_ether.h>
#include <rte_ip.h>
#include <rte_udp.h>
#include "trx_driver.h"
#define DEBUG
#define SSE4 /* define if CPU supports SSE4.1 */
#include "private.c"
/* eCPRI Send and Recv */
#define N_SAMPLES 256
#define PACKET_SIZE 262
#define DATA_SIZE 244
#define FRAME_FREQ INT64_C(3840000)
#define SEND_LIMIT (1250 * 10)
#define TRX_WB_MAX_PARTS 1000
#define TRX_BUF_MAX_SIZE 1000
#define STATISTIC_REFRESH_RATE INT64_C(100 * 1000 * 1000)
static void log_error(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " ERROR [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
exit(EXIT_FAILURE);
}
static void log_info(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " INFO [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
#ifdef DEBUG
static void log_debug(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " DEBUG [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
puts(line);
}
#else
#define log_debug(...)
#endif
static int latency_target_fd = -1;
static int32_t latency_target_value = 0;
/* Latency trick
* if the file /dev/cpu_dma_latency exists,
* open it and write a zero into it. This will tell
* the power management system not to transition to
* a high cstate (in fact, the system acts like idle=poll)
* When the fd to /dev/cpu_dma_latency is closed, the behavior
* goes back to the system default.
*
* Documentation/power/pm_qos_interface.txt
*/
void set_latency_target(void) {
struct stat s;
int err;
errno = 0;
err = stat("/dev/cpu_dma_latency", &s);
if (err == -1) {
error(EXIT_FAILURE, errno, "WARN: stat /dev/cpu_dma_latency failed");
return;
}
errno = 0;
latency_target_fd = open("/dev/cpu_dma_latency", O_RDWR);
if (latency_target_fd == -1) {
error(EXIT_FAILURE, errno, "WARN: open /dev/cpu_dma_latency");
return;
}
errno = 0;
err = write(latency_target_fd, &latency_target_value, 4);
if (err < 1) {
error(EXIT_FAILURE, errno, "# error setting cpu_dma_latency to %d!",
latency_target_value);
close(latency_target_fd);
return;
}
printf("# /dev/cpu_dma_latency set to %dus\n", latency_target_value);
}
typedef struct {
volatile void * buffer;
char name[64];
size_t buf_len;
size_t len;
volatile int write_index;
volatile int read_index;
} ring_buffer_t;
typedef struct {
const char * re_mac;
const char * rec_mac;
const char * rec_if;
int recv_affinity;
int send_affinity;
int prepare_affinity;
int decompress_affinity;
int statistic_affinity;
int ecpri_period;
int flow_id;
int sample_rate;
} TRXEcpriState;
// Buffers
static ring_buffer_t rx_rbuf;
static ring_buffer_t trx_read_rbuf;
static ring_buffer_t tx_rbuf;
static ring_buffer_t trx_write_rbuf;
static volatile int trx_wb_part[TRX_WB_MAX_PARTS]; // TODO write next index instead of current
static volatile int64_t trx_wb_ts[TRX_WB_MAX_PARTS];
static int trx_wb_part_read_index;
static int trx_wb_part_write_index;
// Locks
pthread_mutex_t tx_mutex;
pthread_cond_t tx_cond;
pthread_mutex_t rx_mutex;
pthread_cond_t rx_cond;
pthread_mutex_t tx_ready_mutex;
pthread_cond_t tx_ready_cond;
sem_t trx_read_sem;
// Counters
static volatile int64_t prepared_frame_count; // compressed samples
static volatile int64_t read_frame_count; // frames passed to amarisoft stack
static volatile int64_t sent_frame_count; // frames sent to eRE
static volatile int64_t recv_frame_count; // frames received from eRE
// Computed values
static int rxtx_buf_size;
static int ecpri_period_mult;
// Network
static volatile int seq_id;
// Timestamps utils
#define NSEC_PER_SEC INT64_C(1000000000)
static struct timespec int_to_ts(int64_t t) {
struct timespec ts;
ts.tv_sec = t / NSEC_PER_SEC;
ts.tv_nsec = t - (ts.tv_sec * NSEC_PER_SEC);
return ts;
}
static int64_t ts_to_int(struct timespec ts) {
return ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec;
}
static void add_ns(struct timespec *t, int64_t ns) {
t->tv_nsec += ns;
while (t->tv_nsec >= ((int64_t)NSEC_PER_SEC)) {
t->tv_sec += 1;
t->tv_nsec -= NSEC_PER_SEC;
}
}
static int64_t calcdiff_ns(struct timespec t1, struct timespec t2) {
int64_t diff;
diff = NSEC_PER_SEC * ((int)t1.tv_sec - (int)t2.tv_sec);
diff += ((int)t1.tv_nsec - (int)t2.tv_nsec);
return diff;
}
static void rbuf_update_write_index(ring_buffer_t * rbuf) {
rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
}
static void rbuf_update_read_index(ring_buffer_t * rbuf) {
rbuf->read_index = (rbuf->read_index + 1) % rbuf->buf_len;
}
static int rbuf_read_amount(const ring_buffer_t * rbuf) {
return (rbuf->read_index + rbuf->buf_len - rbuf->write_index) % rbuf->buf_len;
}
static int rbuf_write_amount(const ring_buffer_t * rbuf) {
return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len;
}
#define RBUF_READ(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len))
#define RBUF_WRITE(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len))
#define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\
{\
log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (_buf_len * _len));\
rbuf.buffer = (type *) malloc(_buf_len * _len);\
strcpy(rbuf.name, _name);\
rbuf.buf_len = _buf_len;\
rbuf.len = _len;\
rbuf.write_index = 0;\
rbuf.read_index = 0;\
} while(0)
/* DPDK */
#define BURST_SIZE 16
#define MEMPOOL_CACHE_SIZE 256
#define RTE_TEST_RX_DESC_DEFAULT 1024
#define RTE_TEST_TX_DESC_DEFAULT 1024
static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT;
static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT;
struct rte_mempool *mbuf_pool;
struct rte_ether_addr s_addr;
struct rte_ether_addr d_addr;
int8_t data[BURST_SIZE][DATA_SIZE];
static const struct rte_eth_conf port_conf_default = {
.rxmode = { .max_rx_pkt_len = RTE_ETHER_MAX_LEN }
};
static inline int port_init(int portid, struct rte_mempool *mbuf_pool) {
struct rte_eth_conf port_conf = port_conf_default;
const uint16_t rx_rings = 1, tx_rings = 1;
int retval;
uint16_t q;
retval = rte_eth_dev_configure(portid, rx_rings, tx_rings, &port_conf);
if (retval != 0)
return retval;
/* Allocate and set up 1 RX queue per Ethernet port. */
for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(portid, q, nb_rxd,
rte_eth_dev_socket_id(portid), NULL, mbuf_pool);
if (retval < 0)
return retval;
}
/* Allocate and set up 1 TX queue per Ethernet port. */
for (q = 0; q < tx_rings; q++) {
retval = rte_eth_tx_queue_setup(portid, q, nb_txd,
rte_eth_dev_socket_id(portid), NULL);
if (retval < 0)
return retval;
}
/* Start the Ethernet port. */
retval = rte_eth_dev_start(portid);
if (retval < 0)
return retval;
return 0;
}
static void init_dpdk(int argc, char ** argv) {
unsigned int nb_mbufs;
int ret;
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "initlize fail!");
argc -= ret;
argv += ret;
nb_mbufs = RTE_MAX((nb_rxd + nb_txd + BURST_SIZE + MEMPOOL_CACHE_SIZE), 8192U);
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", nb_mbufs,
MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
if (port_init(0, mbuf_pool) != 0)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", 0);
if (port_init(1, mbuf_pool) != 0)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", 1);
}
static void send_packets(int port) {
struct rte_mbuf * pkt[BURST_SIZE];
struct rte_ether_hdr *eth_hdr;
for(int i = 0; i < BURST_SIZE; i++) {
int pkt_size;
pkt[i] = rte_pktmbuf_alloc(mbuf_pool);
eth_hdr = rte_pktmbuf_mtod(pkt[i], struct rte_ether_hdr*);
if(port) {
eth_hdr->d_addr = s_addr;
eth_hdr->s_addr = d_addr;
} else {
eth_hdr->d_addr = d_addr;
eth_hdr->s_addr = s_addr;
}
eth_hdr->ether_type = 0xaefe;
memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), data[i], DATA_SIZE);
pkt_size = DATA_SIZE + sizeof(struct rte_ether_hdr);
pkt[i]->data_len = pkt_size;
pkt[i]->pkt_len = pkt_size;
}
const uint16_t nb_tx = rte_eth_tx_burst(port, 0, pkt, BURST_SIZE);
/* Free any unsent packets. */
if (unlikely(nb_tx < BURST_SIZE)) {
uint16_t buf;
for (buf = nb_tx; buf < BURST_SIZE; buf++)
rte_pktmbuf_free(pkt[buf]);
fprintf(stderr, "Sent %d packets instead of %d\n", nb_tx, BURST_SIZE);
exit(EXIT_FAILURE);
}
}
// TODO store received packets' data in buffer
static int recv_packets(int port) {
struct rte_mbuf * pkt[BURST_SIZE];
while(1) {
const int nb_rx = rte_eth_rx_burst(port, 0, pkt, BURST_SIZE);
for(int i = 0; i < nb_rx; i++)
rte_pktmbuf_free(pkt[i]);
if(nb_rx)
return nb_rx;
}
}
/* DPDK */
static void *recv_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->recv_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity);
for(;;) {
recv_frame_count += recv_packets(0);
for(int j = 0; j < ecpri_period_mult; j++) {
// TODO write rx_buf
rbuf_update_write_index(&rx_rbuf);
}
pthread_mutex_lock(&rx_mutex);
pthread_cond_signal(&rx_cond);
pthread_mutex_unlock(&rx_mutex);
}
pthread_exit(EXIT_SUCCESS);
}
static void *send_thread(void *p) {
cpu_set_t mask;
struct timespec initial, next;
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("SEND_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->send_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity);
pthread_mutex_lock(&tx_ready_mutex);
pthread_cond_wait(&tx_ready_cond, &tx_ready_mutex);
pthread_mutex_unlock(&tx_ready_mutex);
clock_gettime(CLOCK_TAI, &initial);
for(int64_t i = 1;; i++) {
#ifdef DEBUG
if(i > SEND_LIMIT) {
int64_t d;
clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial);
log_debug("SEND_THREAD", "Packets sent: %" PRIi64, sent_frame_count);
log_debug("SEND_THREAD", "Duration: %" PRIi64, d);
log_debug("SEND_THREAD", "ecpri_period_mult: %" PRIi64, ecpri_period_mult);
log_debug("SEND_THREAD", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
exit(EXIT_SUCCESS);
}
#endif
next = initial;
// Multiply by i everytime to prevent any frequence drift
add_ns(&next, (ecpri_period_mult * NSEC_PER_SEC * i) / FRAME_FREQ);
for(int j = 0; j < (ecpri_period_mult / BURST_SIZE); j++) {
for(int k = 0; k < BURST_SIZE; k++) {
memcpy(data[k], RBUF_READ(tx_rbuf, uint8_t), tx_rbuf.len);
rbuf_update_read_index(&tx_rbuf);
}
send_packets(0);
sent_frame_count += BURST_SIZE;
}
pthread_mutex_lock(&tx_mutex);
pthread_cond_signal(&tx_cond);
pthread_mutex_unlock(&tx_mutex);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
}
pthread_exit(EXIT_SUCCESS);
}
static void *prepare_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int tx_ready_buffer_full = 0;
log_info("PREPARE_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->prepare_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->prepare_affinity);
for(int64_t i = 0;; i++) {
int16_t samples_int[N_SAMPLES];
// If we have frames to prepare
int n = rbuf_write_amount(&tx_rbuf);
if((i == 0) || n) {
// If there are frames from trx_write callback to prepare
if(rbuf_read_amount(&trx_write_rbuf)) {
int64_t ts = trx_wb_ts[trx_wb_part_read_index];
int empty_frames_ahead = ts - prepared_frame_count;
empty_frames_ahead = empty_frames_ahead < n ? empty_frames_ahead : n;
if(empty_frames_ahead > 0) {
for(int j = 0; j < empty_frames_ahead; j++) {
*((uint16_t *) (RBUF_WRITE(tx_rbuf, uint8_t) + 20)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
prepared_frame_count++;
}
}
else if (empty_frames_ahead == 0) {
int m = trx_wb_part[(trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS] - trx_write_rbuf.read_index;
m = m < n ? m : n;
for(int j = 0; j < m; j++) {
float * const trx_samples = RBUF_READ(trx_write_rbuf, float);
uint8_t * const tx_frame = RBUF_WRITE(tx_rbuf, uint8_t);
memset(samples_int, 0, 512);
float_to_int16(samples_int, trx_samples, N_SAMPLES, 32767);
encode_bf1(tx_frame + 22 , samples_int);
encode_bf1(tx_frame + 22 + 60 , samples_int + 64);
encode_bf1(tx_frame + 22 + 120, samples_int + 128);
encode_bf1(tx_frame + 22 + 180, samples_int + 192);
*((uint16_t *)(tx_frame + 20)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
rbuf_update_read_index(&trx_write_rbuf);
prepared_frame_count++;
}
if(m == 0)
trx_wb_part_read_index = (trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS;
}
else {
log_error("PREPARE_THREAD", "missed trx_write timestamp");
}
}
else {
*((uint16_t *) (RBUF_WRITE(tx_rbuf, uint8_t) + 20)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
prepared_frame_count++;
}
}
else {
if (!tx_ready_buffer_full) {
tx_ready_buffer_full = 1;
pthread_mutex_lock(&tx_ready_mutex);
pthread_cond_signal(&tx_ready_cond);
pthread_mutex_unlock(&tx_ready_mutex);
}
pthread_mutex_lock(&tx_mutex);
pthread_cond_wait(&tx_cond, &tx_mutex);
pthread_mutex_unlock(&tx_mutex);
}
}
pthread_exit(EXIT_SUCCESS);
}
static void *decompress_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int rx_ready = 0;
const float mult = 1. / 32767.;
log_info("DECOMPRESS_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->decompress_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decompress_affinity);
for(;;) {
int n = rbuf_read_amount(&rx_rbuf);
if(n) {
for(int j = 0; j < n; j++) {
int16_t samples_int[N_SAMPLES];
const uint8_t * rx_samples = RBUF_READ(rx_rbuf, uint8_t) + 22;
// TODO : analyze seq_id, ecpri packet type etc... ?
// TODO : set rx_ready at some point (when ?)
if(rx_ready) {
memset(samples_int, 0, 512);
decode_bf1(samples_int , rx_samples , 16);
decode_bf1(samples_int + 64 , rx_samples + 60, 16);
decode_bf1(samples_int + 128, rx_samples + 120, 16);
decode_bf1(samples_int + 192, rx_samples + 180, 16);
int16_to_float(RBUF_WRITE(trx_read_rbuf, float), samples_int, N_SAMPLES, mult);
rbuf_update_read_index(&rx_rbuf);
rbuf_update_write_index(&trx_read_rbuf);
sem_post(&trx_read_sem);
}
}
}
else {
pthread_mutex_lock(&rx_mutex);
pthread_cond_wait(&rx_cond, &rx_mutex);
pthread_mutex_unlock(&rx_mutex);
}
}
pthread_exit(EXIT_SUCCESS);
}
static void *statistic_thread(void *p) {
struct timespec next, initial;
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("STATISTIC_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->statistic_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->statistic_affinity);
clock_gettime(CLOCK_TAI, &initial);
next = initial;
for(;;) {
add_ns(&next, STATISTIC_REFRESH_RATE);
log_info("STATS", "%14" PRIi64 " - %14" PRIi64 " - %14" PRIi64 " - %14" PRIi64,
prepared_frame_count,
read_frame_count,
sent_frame_count,
recv_frame_count);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
}
pthread_exit(EXIT_SUCCESS);
}
static int start_threads(TRXEcpriState * s) {
pthread_t recv_pthread;
pthread_t send_pthread;
pthread_t prepare_pthread;
pthread_t decompress_pthread;
pthread_t statistic_pthread;
struct sched_param recv_param;
struct sched_param send_param;
struct sched_param prepare_param;
struct sched_param decompress_param;
struct sched_param statistic_param;
pthread_attr_t recv_attr;
pthread_attr_t send_attr;
pthread_attr_t prepare_attr;
pthread_attr_t decompress_attr;
pthread_attr_t statistic_attr;
log_info("TRX_ECPRI", "Starting threads");
// Initialize pthread attributes (default values)
if (pthread_attr_init(&recv_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
// Set a specific stack size
if (pthread_attr_setstacksize(&recv_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
// Set scheduler policy and priority of pthread
if (pthread_attr_setschedpolicy(&recv_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
recv_param.sched_priority = 97;
if (pthread_attr_setschedparam(&recv_attr, &recv_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
/* Use scheduling parameters of attr */
if (pthread_attr_setinheritsched(&recv_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&send_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&send_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&send_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
send_param.sched_priority = 97;
if (pthread_attr_setschedparam(&send_attr, &send_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&send_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&prepare_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&prepare_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&prepare_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
prepare_param.sched_priority = 97;
if (pthread_attr_setschedparam(&prepare_attr, &prepare_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&prepare_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&decompress_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&decompress_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&decompress_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
decompress_param.sched_priority = 97;
if (pthread_attr_setschedparam(&decompress_attr, &decompress_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&decompress_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&statistic_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&statistic_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&statistic_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
statistic_param.sched_priority = 97;
if (pthread_attr_setschedparam(&statistic_attr, &statistic_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&statistic_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_create(&recv_pthread, NULL, recv_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create recv thread");
if (pthread_create(&send_pthread, NULL, send_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create send thread");
if (pthread_create(&prepare_pthread, NULL, prepare_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create prepare thread");
if (pthread_create(&decompress_pthread, NULL, decompress_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create decompress thread");
if (pthread_create(&statistic_pthread, NULL, statistic_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create statistic thread");
return 0;
}
int startdpdk(TRXEcpriState * s) {
uint8_t ecpri_message[DATA_SIZE];
#define ARGC 9
#define ARGC_LEN 256
char _argv[ARGC][ARGC_LEN] = {
"",
"-l",
"28",
"-b",
"0000:04:00.0",
"-b",
"0000:5e:00.0",
"-b",
"0000:5e:00.1",
};
char ** argv = (char **) malloc(sizeof(char *) * ARGC);
for(int i = 0; i < ARGC; i++) {
argv[i] = (char *) malloc(ARGC_LEN);
strcpy(argv[i], _argv[i]);
}
init_dpdk(ARGC, argv);
#undef ARGC
#undef ARGC_LEN
log_debug("TRX_ECPRI", "start");
//set_latency_target();
seq_id = 0;
read_frame_count = 0;
sent_frame_count = 0;
prepared_frame_count = 0;
ecpri_period_mult = (s->ecpri_period * FRAME_FREQ) / 1000000;
rxtx_buf_size = (3 * ecpri_period_mult);
RBUF_INIT(rx_rbuf, "RX ring buffer", rxtx_buf_size, DATA_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", rxtx_buf_size, DATA_SIZE, uint8_t);
RBUF_INIT(trx_read_rbuf, "TRXRead ring buffer", TRX_BUF_MAX_SIZE, N_SAMPLES, float);
RBUF_INIT(trx_write_rbuf, "TRXWrite ring buffer", TRX_BUF_MAX_SIZE, N_SAMPLES, float);
trx_wb_part_read_index = 0;
trx_wb_part_write_index = 0;
pthread_mutex_init(&tx_mutex, NULL);
pthread_mutex_init(&rx_mutex, NULL);
pthread_mutex_init(&tx_ready_mutex, NULL);
pthread_cond_init(&tx_cond, NULL);
pthread_cond_init(&rx_cond, NULL);
pthread_cond_init(&tx_ready_cond, NULL);
sem_init(&trx_read_sem, 0, 0);
memset((uint8_t *) ecpri_message, 0, DATA_SIZE);
if(sscanf(s->re_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
&d_addr.addr_bytes[0],
&d_addr.addr_bytes[1],
&d_addr.addr_bytes[2],
&d_addr.addr_bytes[3],
&d_addr.addr_bytes[4],
&d_addr.addr_bytes[5]) != 6)
fprintf(stderr, "Invalid eRE MAC address\n");
if(sscanf(s->rec_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
&s_addr.addr_bytes[0],
&s_addr.addr_bytes[1],
&s_addr.addr_bytes[2],
&s_addr.addr_bytes[3],
&s_addr.addr_bytes[4],
&s_addr.addr_bytes[5]) != 6)
fprintf(stderr, "Invalid eREC MAC address\n");
/* Standard Header */
ecpri_message[0] = 0x10; // Protocol data revision 0x1, C = 0
// Message type = 0x00, IQ data
// Payload size
*((uint16_t *) (ecpri_message + 2)) = htons(DATA_SIZE);
*((uint16_t *) (ecpri_message + 4)) = htons(s->flow_id);
for(int i = 0; i < rxtx_buf_size; i++)
memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_message, tx_rbuf.len);
start_threads(s);
return 0;
}
static void trx_ecpri_end(TRXState *s1)
{
log_info("TRX_ECPRI", "End");
TRXEcpriState *s = s1->opaque;
free(s);
}
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{
(void) s1;
float ** _samples = (float **) __samples;
int write_count = count >> 5;
int64_t ts = timestamp >> 5;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index;
trx_wb_ts[trx_wb_part_write_index] = ts;
for(int k = 0; k < write_count; k++) {
for(int i = 0; i < 4; i++)
for(int j = 0; j < 64; j++)
RBUF_WRITE(trx_write_rbuf, float)[i * 64 + j] = _samples[i][j + (k << 6)];
rbuf_update_write_index(&trx_write_rbuf);
}
trx_wb_part_write_index = (trx_wb_part_write_index + 1) % TRX_WB_MAX_PARTS;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index + write_count;
}
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md)
{
(void) s1;
float ** _samples = (float **) __samples;
int read_count = count >> 5;
for(int k = 0; k < read_count; k++) {
float * trx_samples;
sem_wait(&trx_read_sem);
trx_samples = RBUF_READ(trx_read_rbuf, float);
for(int i = 0; i < 4; i++)
for(int j = 0; j < 64; j++)
_samples[i][j + (k << 6)] = trx_samples[i * 64 + j];
rbuf_update_read_index(&trx_read_rbuf);
}
*ptimestamp = read_frame_count << 5;
read_frame_count += read_count;
return count;
}
/* This function can be used to automatically set the sample
rate. Here we don't implement it, so the user has to force a given
sample rate with the "sample_rate" configuration option */
static int trx_ecpri_get_sample_rate(TRXState *s1, TRXFraction *psample_rate,
int *psample_rate_num, int sample_rate_min)
{
return -1;
}
static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params)
{
TRXEcpriState *s = s1->opaque;
s->sample_rate = params->sample_rate[0].num / params->sample_rate[0].den;
startdpdk(s);
return 0;
}
int trx_driver_init(TRXState *s1)
{
TRXEcpriState *s;
double val;
// Lock all current and future pages from preventing of being paged to
// swap
if (mlockall(MCL_CURRENT | MCL_FUTURE)) {
log_error("TRX_ECPRI", "mlockall failed");
}
log_info("TRX_ECPRI", "Init");
if (s1->trx_api_version != TRX_API_VERSION) {
fprintf(stderr, "ABI compatibility mismatch between LTEENB and TRX driver (LTEENB ABI version=%d, TRX driver ABI version=%d)\n",
s1->trx_api_version, TRX_API_VERSION);
return -1;
}
s = malloc(sizeof(TRXEcpriState));
memset(s, 0, sizeof(*s));
trx_get_param_double(s1, &val, "recv_affinity");
s->recv_affinity = (int) val;
trx_get_param_double(s1, &val, "send_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "prepare_affinity");
s->prepare_affinity = (int) val;
trx_get_param_double(s1, &val, "decompress_affinity");
s->decompress_affinity = (int) val;
trx_get_param_double(s1, &val, "statistic_affinity");
s->statistic_affinity = (int) val;
trx_get_param_double(s1, &val, "flow_id");
s->flow_id = (int) val;
trx_get_param_double(s1, &val, "ecpri_period");
s->ecpri_period = (int) val;
s->re_mac = trx_get_param_string(s1, "re_mac");
s->rec_mac = trx_get_param_string(s1, "rec_mac");
s->rec_if = trx_get_param_string(s1, "rec_if");
s1->opaque = s;
s1->trx_end_func = trx_ecpri_end;
s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read;
s1->trx_start_func = trx_ecpri_start;
s1->trx_get_sample_rate_func = trx_ecpri_get_sample_rate;
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment