Commit b6d9bf74 authored by Joanne Hugé's avatar Joanne Hugé

Rewrite tracing, fix bugs, add simple monitoring

parent e4e825e3
...@@ -4,13 +4,14 @@ ...@@ -4,13 +4,14 @@
#define RTE_TEST_TX_DESC_DEFAULT 1024 #define RTE_TEST_TX_DESC_DEFAULT 1024
static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT; static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT;
static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT; static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT;
struct rte_mempool *mbuf_pool; struct rte_mempool *tx_mbuf_pool;
struct rte_mempool *rx_mbuf_pool;
struct rte_ether_addr s_addr; struct rte_ether_addr s_addr;
struct rte_ether_addr d_addr; struct rte_ether_addr d_addr;
static const struct rte_eth_conf port_conf_default = { static const struct rte_eth_conf port_conf_default = {
.rxmode = { .max_lro_pkt_size = RTE_ETHER_MAX_LEN } .rxmode = { .max_lro_pkt_size = RTE_ETHER_MAX_LEN }
}; };
static inline int port_init(int portid, struct rte_mempool *mbuf_pool) { static inline int port_init(int portid, struct rte_mempool *rx_mbuf_pool) {
struct rte_eth_conf port_conf = port_conf_default; struct rte_eth_conf port_conf = port_conf_default;
const uint16_t rx_rings = 1, tx_rings = 1; const uint16_t rx_rings = 1, tx_rings = 1;
int retval; int retval;
...@@ -23,7 +24,7 @@ static inline int port_init(int portid, struct rte_mempool *mbuf_pool) { ...@@ -23,7 +24,7 @@ static inline int port_init(int portid, struct rte_mempool *mbuf_pool) {
/* Allocate and set up 1 RX queue per Ethernet port. */ /* Allocate and set up 1 RX queue per Ethernet port. */
for (q = 0; q < rx_rings; q++) { for (q = 0; q < rx_rings; q++) {
retval = rte_eth_rx_queue_setup(portid, q, nb_rxd, retval = rte_eth_rx_queue_setup(portid, q, nb_rxd,
rte_eth_dev_socket_id(portid), NULL, mbuf_pool); rte_eth_dev_socket_id(portid), NULL, rx_mbuf_pool);
if (retval < 0) if (retval < 0)
return retval; return retval;
} }
...@@ -55,13 +56,17 @@ static void init_dpdk(int argc, char ** argv) { ...@@ -55,13 +56,17 @@ static void init_dpdk(int argc, char ** argv) {
argv += ret; argv += ret;
nb_mbufs = RTE_MAX((nb_rxd + nb_txd + BURST_SIZE + MEMPOOL_CACHE_SIZE), 8192U); nb_mbufs = RTE_MAX((nb_rxd + nb_txd + BURST_SIZE + MEMPOOL_CACHE_SIZE), 8192U);
nb_mbufs = 1024U * 16; nb_mbufs = 1024U * 16 - 1;
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", nb_mbufs, tx_mbuf_pool = rte_pktmbuf_pool_create("TX_MBUF_POOL", nb_mbufs,
MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, SOCKET_ID_ANY);
if (mbuf_pool == NULL) if (tx_mbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n"); rte_exit(EXIT_FAILURE, "Cannot create tx mbuf pool\n");
rx_mbuf_pool = rte_pktmbuf_pool_create("RX_MBUF_POOL", nb_mbufs,
MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, SOCKET_ID_ANY);
if (rx_mbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot create rx mbuf pool\n");
if (port_init(0, mbuf_pool) != 0) if (port_init(0, rx_mbuf_pool) != 0)
rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", 0); rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n", 0);
} }
...@@ -49,12 +49,18 @@ ...@@ -49,12 +49,18 @@
//#define DEBUG // Enables / deactivates log_debug //#define DEBUG // Enables / deactivates log_debug
//#define DST_ADDR_SYNTAX // Depends on DPDK version //#define DST_ADDR_SYNTAX // Depends on DPDK version
//#define SEND_LIMIT (1000)
//#define TRACE //#define TRACE
#define MONITOR
#define RECV_STOP_THRESHOLD 3
#define PPS_UPDATE_PERIOD INT64_C(1000000000)
#include "utils.c" #include "utils.c"
#define EFREQ 38400
#define STAT_FRAME_INTERVAL INT64_C(EFREQ * 150)
#define START_SENDING
#define RX_N_CHANNEL 1 #define RX_N_CHANNEL 1
#define TX_N_CHANNEL 4 #define TX_N_CHANNEL 4
#define FRAME_FREQ INT64_C(3840000) // Basic frame frequency #define FRAME_FREQ INT64_C(3840000) // Basic frame frequency
...@@ -70,7 +76,6 @@ ...@@ -70,7 +76,6 @@
#define STATISTIC_REFRESH_RATE INT64_C(500 * 1000 * 1000) #define STATISTIC_REFRESH_RATE INT64_C(500 * 1000 * 1000)
#define STAT_INT_LEN "9" #define STAT_INT_LEN "9"
#define TRACE_BUFFER_SIZE_MB 200
typedef struct { typedef struct {
float re; float re;
...@@ -110,27 +115,6 @@ typedef struct { ...@@ -110,27 +115,6 @@ typedef struct {
volatile int write_ahead; volatile int write_ahead;
} ring_buffer_t; } ring_buffer_t;
#ifdef TRACE
typedef struct {
int64_t size;
volatile int64_t counter;
volatile uint8_t * buffer;
} buffer_t;
static void init_buffer(buffer_t * buffer, int64_t size) {
buffer->size = size;
buffer->buffer = (uint8_t *) malloc(size);
buffer->counter = 0;
memset((uint8_t*) buffer->buffer, 0, size);
}
static void write_buffer(buffer_t * buffer, int i, uint8_t * source, int64_t len) {
if(buffer->counter + len >= buffer->size)
return;
memcpy((uint8_t*) (buffer->buffer + i + buffer->counter), source, len);
buffer->counter += len + i;
}
#endif
typedef struct { typedef struct {
int64_t count; int64_t count;
uint8_t wait; uint8_t wait;
...@@ -155,32 +139,28 @@ static volatile counter_stat_t recv_counter; // frames received from eRE ...@@ -155,32 +139,28 @@ static volatile counter_stat_t recv_counter; // frames received from eRE
static volatile counter_stat_t decode_counter; // decoded frames static volatile counter_stat_t decode_counter; // decoded frames
static volatile counter_stat_t read_counter; // frames passed to amarisoft stack static volatile counter_stat_t read_counter; // frames passed to amarisoft stack
static volatile counter_stat_t write_counter; // samples to write from TRX static volatile counter_stat_t write_counter; // samples to write from TRX
static volatile counter_stat_t encode_counter; // compressed samples static volatile counter_stat_t encode_counter; // encoded frames
static volatile counter_stat_t sent_counter; // frames sent to eRE static volatile counter_stat_t sent_counter; // frames sent to eRE
static volatile counter_stat_t rx_drop_counter; // frames sent to eRE static volatile counter_stat_t rx_drop_counter; // frames sent to eRE
static volatile counter_stat_t tx_drop_counter; // frames sent to eRE static volatile counter_stat_t tx_drop_counter; // frames sent to eRE
#define EFREQ 38400
#define STAT_FRAME_INTERVAL INT64_C(EFREQ * 150)
static volatile int sync_complete = 0; static volatile int sync_complete = 0;
static volatile int sync_happened = 0;
static int first_trx_write = 1; static int first_trx_write = 1;
static uint8_t iq_frame_full[1024]; #ifndef DPDK
static uint8_t iq_frame_empty[1024];
static uint8_t pkt_frame_full[1024]; static uint8_t pkt_frame_full[1024];
static uint8_t pkt_frame_empty[1024]; #endif
// Network
static volatile int seq_id;
#ifdef TRACE #ifdef TRACE
static buffer_t tx_trace_buffer; static volatile int rx_trace_ready = 0;
static buffer_t rx_trace_buffer; static volatile int tx_trace_ready = 1;
static buffer_t trxw_trace_buffer; static int tx_trace_index_start = 0;
static buffer_t trxr_trace_buffer;
#endif #endif
// Network
static volatile int seq_id;
static void rbuf_update_write_index(ring_buffer_t * rbuf) { static void rbuf_update_write_index(ring_buffer_t * rbuf) {
rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len; rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
} }
...@@ -328,7 +308,7 @@ static void send_packets(int port) { ...@@ -328,7 +308,7 @@ static void send_packets(int port) {
for(int i = 0; i < TX_POOL_SIZE; i++) { for(int i = 0; i < TX_POOL_SIZE; i++) {
int pkt_size; int pkt_size;
pkt[i] = rte_pktmbuf_alloc(mbuf_pool); pkt[i] = rte_pktmbuf_alloc(tx_mbuf_pool);
eth_hdr = rte_pktmbuf_mtod(pkt[i], struct rte_ether_hdr*); eth_hdr = rte_pktmbuf_mtod(pkt[i], struct rte_ether_hdr*);
if(port) { if(port) {
eth_hdr->d_addr = s_addr; eth_hdr->d_addr = s_addr;
...@@ -378,117 +358,76 @@ static void init_counter(volatile counter_stat_t * c) { ...@@ -378,117 +358,76 @@ static void init_counter(volatile counter_stat_t * c) {
c->pps_ts = 0; c->pps_ts = 0;
c->pps = 0; c->pps = 0;
} }
static void update_counter(volatile counter_stat_t * c, int64_t v) { static void update_counter_pps(volatile counter_stat_t * c) {
struct timespec _ts; struct timespec _ts;
int64_t ts; int64_t ts;
c->counter += v;
c->pps_counter += v;
if(c->pps_counter >= STAT_FRAME_INTERVAL) {
clock_gettime(CLOCK_TAI, &_ts); clock_gettime(CLOCK_TAI, &_ts);
ts = ts_to_int(_ts); ts = ts_to_int(_ts);
if((ts - c->pps_ts) > PPS_UPDATE_PERIOD) {
if(c->pps_ts) if(c->pps_ts)
c->pps = (c->pps_counter * NSEC_PER_SEC) / (ts - c->pps_ts); c->pps = ((c->counter - c->pps_counter) * NSEC_PER_SEC) / (ts - c->pps_ts);
c->pps_counter = 0; c->pps_counter = c->counter;
c->pps_ts = ts; c->pps_ts = ts;
} }
} }
#ifdef SEND_LIMIT static void update_counter(volatile counter_stat_t * c, int64_t v) {
static void send_limit_handler(struct timespec initial, TRXEcpriState * s) { c->counter += v;
struct timespec next; }
#ifdef TRACE #ifdef TRACE
if(((tx_trace_buffer.counter / 262) > SEND_LIMIT) && static void trace_handler(struct timespec initial, TRXEcpriState * s) {
((rx_trace_buffer.counter / 262) > SEND_LIMIT) && struct timespec next;
((trxw_trace_buffer.counter / 1024) > SEND_LIMIT) && if(tx_trace_ready && rx_trace_ready) {
((trxr_trace_buffer.counter / 256) > SEND_LIMIT)) {
int64_t d;
clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial);
log_info("SEND_THREAD", "Packets sent: %" PRIi64, sent_counter.counter);
log_info("SEND_THREAD", "Duration: %" PRIi64, d);
log_info("SEND_THREAD", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
FILE * f;
char n[256];
log_info("SEND_THREAD", "tx_trace_buffer counter: %li", tx_trace_buffer.counter);
log_info("SEND_THREAD", "rx_trace_buffer counter: %li", rx_trace_buffer.counter);
log_info("SEND_THREAD", "trxw_trace_buffer counter: %li", trxw_trace_buffer.counter);
log_info("SEND_THREAD", "trxr_trace_buffer counter: %li", trxr_trace_buffer.counter);
memset(n, '\0', 256);
sprintf(n, "%s/tx.trace", s->log_directory);
f = fopen(n, "wb+");
fwrite((uint8_t*) tx_trace_buffer.buffer, tx_trace_buffer.counter, 1, f);
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/rx.trace", s->log_directory);
f = fopen(n, "wb+");
fwrite((uint8_t*) rx_trace_buffer.buffer, rx_trace_buffer.counter, 1, f);
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/trxr.trace", s->log_directory);
f = fopen(n, "wb+");
fwrite((uint8_t*) trxr_trace_buffer.buffer, trxr_trace_buffer.counter, 1, f);
fclose(f);
memset(n, '\0', 256);
sprintf(n, "%s/trxw.trace", s->log_directory);
f = fopen(n, "wb+");
fwrite((uint8_t*) trxw_trace_buffer.buffer, trxw_trace_buffer.counter, 1, f);
fclose(f);
#else
if((read_counter.counter > SEND_LIMIT)) {
int64_t d; int64_t d;
clock_gettime(CLOCK_TAI, &next); clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial); d = calcdiff_ns(next, initial);
log_info("SEND_THREAD", "Packets sent: %" PRIi64, sent_counter.counter); log_info("TRACE", "Packets sent: %" PRIi64, sent_counter.counter);
log_info("SEND_THREAD", "Duration: %" PRIi64, d); log_info("TRACE", "Duration: %" PRIi64, d);
log_info("SEND_THREAD", "FRAME_FREQ: %" PRIi64, FRAME_FREQ); log_info("TRACE", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
FILE * f; FILE * f;
char n[256]; char n[256];
int wi; uint8_t ones[14];
for(int i = 0; i < 14; i++)
ones[i] = 0xff;
memset(n, '\0', 256); memset(n, '\0', 256);
sprintf(n, "%s/tx.trace", s->log_directory); sprintf(n, "%s/tx.trace", s->log_directory);
f = fopen(n, "wb+"); f = fopen(n, "wb+");
wi = (tx_rbuf.write_index - SEND_LIMIT + tx_rbuf.buf_len) % tx_rbuf.buf_len; log_info("TRACE", "Writing %d frames to tx.trace", tx_rbuf.write_index + tx_rbuf.buf_len - tx_trace_index_start);
for(int i = 0; i < SEND_LIMIT; i++) { for(int i = tx_trace_index_start; i != tx_rbuf.write_index; i = (i + 1) % tx_rbuf.buf_len) {
fwrite(pkt_frame_full, 14, 1, f); fwrite(ones, 14, 1, f);
fwrite(((uint8_t*) tx_rbuf.buffer) + wi * tx_rbuf.len, tx_rbuf.len, 1, f); fwrite(((uint8_t*) tx_rbuf.buffer) + i * tx_rbuf.len, tx_rbuf.len, 1, f);
wi = (wi + 1) % tx_rbuf.buf_len;
} }
fclose(f); fclose(f);
memset(n, '\0', 256); memset(n, '\0', 256);
sprintf(n, "%s/rx.trace", s->log_directory); sprintf(n, "%s/rx.trace", s->log_directory);
f = fopen(n, "wb+"); f = fopen(n, "wb+");
wi = (rx_rbuf.write_index - SEND_LIMIT + rx_rbuf.buf_len) % rx_rbuf.buf_len; log_info("TRACE", "Writing %d frames to rx.trace", rx_rbuf.write_index);
for(int i = 0; i < SEND_LIMIT; i++) { for(int i = 0; i < rx_rbuf.write_index; i++) {
fwrite(((uint8_t*) rx_rbuf.buffer) + wi * rx_rbuf.len, rx_rbuf.len, 1, f); fwrite(((uint8_t*) rx_rbuf.buffer) + i * rx_rbuf.len, rx_rbuf.len, 1, f);
wi = (wi + 1) % rx_rbuf.buf_len;
} }
fclose(f); fclose(f);
memset(n, '\0', 256); memset(n, '\0', 256);
sprintf(n, "%s/trxw.trace", s->log_directory); sprintf(n, "%s/trxw.trace", s->log_directory);
f = fopen(n, "wb+"); f = fopen(n, "wb+");
wi = (trxw_rbuf[0].write_index - SEND_LIMIT + trxw_rbuf[0].buf_len) % trxw_rbuf[0].buf_len; log_info("TRACE", "Writing %d frames to trxw.trace", trxw_rbuf[0].write_index);
for(int i = 0; i < SEND_LIMIT; i++) { for(int i = 0; i < trxw_rbuf[0].write_index; i++) {
for(int j = 0; j < TX_N_CHANNEL; j++) for(int j = 0; j < TX_N_CHANNEL; j++)
fwrite((uint8_t *) (((Complex *) trxw_rbuf[j].buffer) + wi * trxw_rbuf[0].len), trxw_rbuf[0].len * sizeof(Complex), 1, f); fwrite((uint8_t *) (((Complex *) trxw_rbuf[j].buffer) + i * trxw_rbuf[0].len), trxw_rbuf[0].len * sizeof(Complex), 1, f);
wi = (wi + 1) % trxw_rbuf[0].buf_len;
} }
fclose(f); fclose(f);
memset(n, '\0', 256); memset(n, '\0', 256);
sprintf(n, "%s/trxr.trace", s->log_directory); sprintf(n, "%s/trxr.trace", s->log_directory);
f = fopen(n, "wb+"); f = fopen(n, "wb+");
wi = (trxr_rbuf[0].write_index - SEND_LIMIT + trxr_rbuf[0].buf_len) % trxr_rbuf[0].buf_len; log_info("TRACE", "Writing %d frames to trxr.trace", trxr_rbuf[0].write_index);
for(int i = 0; i < SEND_LIMIT; i++) { for(int i = 0; i < trxr_rbuf[0].write_index; i++) {
for(int j = 0; j < RX_N_CHANNEL; j++) for(int j = 0; j < RX_N_CHANNEL; j++)
fwrite((uint8_t *) (((Complex *) trxr_rbuf[j].buffer) + wi * trxr_rbuf[0].len), trxr_rbuf[0].len * sizeof(Complex), 1, f); fwrite((uint8_t *) (((Complex *) trxr_rbuf[j].buffer) + i * trxr_rbuf[0].len), trxr_rbuf[0].len * sizeof(Complex), 1, f);
wi = (wi + 1) % trxr_rbuf[0].buf_len;
} }
fclose(f); fclose(f);
log_exit("", "Finished tracing");
#endif
log_exit("", "Send limit reached");
} }
} }
#endif #endif
...@@ -498,6 +437,8 @@ static void *recv_thread(void *p) { ...@@ -498,6 +437,8 @@ static void *recv_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
int first_seq_id = 1;
log_info("RECV_THREAD", "Thread init"); log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
...@@ -540,16 +481,27 @@ static void *recv_thread(void *p) { ...@@ -540,16 +481,27 @@ static void *recv_thread(void *p) {
nr = nb_rx; nr = nb_rx;
while((nc = rbuf_contiguous_copy(NULL, &rx_rbuf, nr))) { while((nc = rbuf_contiguous_copy(NULL, &rx_rbuf, nr))) {
#ifdef TRACE
if((rx_rbuf.write_index + nc) >= rx_rbuf.buf_len) {
log_info("RECV_THREAD", "RX Trace ready");
rx_trace_ready = 1;
pthread_exit(EXIT_SUCCESS);
}
#endif
buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len); buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len);
for(int i = 0; i < nc; i++) { for(int i = 0; i < nc; i++) {
#ifdef DPDK #ifdef DPDK
rtebuf = (uint8_t *) (pkt[i])->buf_addr + (pkt[i])->data_off; rtebuf = (uint8_t *) (pkt[i])->buf_addr + (pkt[i])->data_off;
memcpy(buf + i * rx_rbuf.len, rtebuf + 22, rx_rbuf.len); if(first_seq_id) {
uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]);
printf("seq_id = %d\n", seq_id);
first_seq_id = 0;
}
memcpy(buf + i * rx_rbuf.len, rtebuf, rx_rbuf.len);
#else #else
memcpy(buf + i * rx_rbuf.len, pkt_frame_full, rx_rbuf.len); memcpy(buf + i * rx_rbuf.len, pkt_frame_full, rx_rbuf.len);
#endif
#ifdef TRACE
write_buffer(&rx_trace_buffer, 0, rtebuf, 262);
#endif #endif
} }
rx_rbuf.write_index = (rx_rbuf.write_index + nc) % rx_rbuf.buf_len; rx_rbuf.write_index = (rx_rbuf.write_index + nc) % rx_rbuf.buf_len;
...@@ -582,16 +534,13 @@ static void *send_thread(void *p) { ...@@ -582,16 +534,13 @@ static void *send_thread(void *p) {
for(int64_t i = 1;; i++) { for(int64_t i = 1;; i++) {
#ifdef SEND_LIMIT
send_limit_handler(initial, s);
#endif
int64_t n = rbuf_read_amount(&tx_rbuf); int64_t n = rbuf_read_amount(&tx_rbuf);
if(n >= BURST_SIZE) { if(n >= BURST_SIZE) {
int nb_burst = n / BURST_SIZE; int nb_burst = n / BURST_SIZE;
for(int j = 0; j < nb_burst; j++) { for(int j = 0; j < nb_burst; j++) {
for(int k = 0; k < BURST_SIZE; k++) { for(int k = 0; k < BURST_SIZE; k++) {
memcpy(tx_data[k], RBUF_READ0(tx_rbuf, uint8_t), tx_rbuf.len); memcpy(tx_data[k], RBUF_READ0(tx_rbuf, uint8_t), tx_rbuf.len);
rbuf_update_read_index(&tx_rbuf); rbuf_update_read_index(&tx_rbuf);
...@@ -622,7 +571,6 @@ static void *encode_thread(void *p) { ...@@ -622,7 +571,6 @@ static void *encode_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
int trx_started = 0;
struct timespec next; struct timespec next;
int64_t target_counter = 0; int64_t target_counter = 0;
int reset_encode_counter = 1; int reset_encode_counter = 1;
...@@ -633,7 +581,6 @@ static void *encode_thread(void *p) { ...@@ -633,7 +581,6 @@ static void *encode_thread(void *p) {
if (sched_setaffinity(0, sizeof(mask), &mask)) if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity); error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity);
#if 1
for(int64_t i = 0;; i++) { for(int64_t i = 0;; i++) {
int n; int n;
...@@ -641,6 +588,9 @@ static void *encode_thread(void *p) { ...@@ -641,6 +588,9 @@ static void *encode_thread(void *p) {
encode_counter.counter = 0; encode_counter.counter = 0;
reset_encode_counter = 0; reset_encode_counter = 0;
seq_id = 0; seq_id = 0;
#ifdef TRACE
tx_trace_index_start = tx_rbuf.write_index;
#endif
} }
// If we have frames to encode (is there space in TX buffer) // If we have frames to encode (is there space in TX buffer)
...@@ -649,7 +599,6 @@ static void *encode_thread(void *p) { ...@@ -649,7 +599,6 @@ static void *encode_thread(void *p) {
// If there are frames from trx_write callback to encode // If there are frames from trx_write callback to encode
if(rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) { if(rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) {
sample_group_t * g; int nb_frames; sample_group_t * g; int nb_frames;
trx_started = 1;
g = RBUF_READ0(trxw_group_rbuf, sample_group_t); g = RBUF_READ0(trxw_group_rbuf, sample_group_t);
if(g->wait) { if(g->wait) {
...@@ -659,14 +608,17 @@ static void *encode_thread(void *p) { ...@@ -659,14 +608,17 @@ static void *encode_thread(void *p) {
} }
nb_frames = g->count > n ? n : g->count; nb_frames = g->count > n ? n : g->count;
g->count -= nb_frames; g->count -= nb_frames;
#ifdef TRACE
if((encode_counter.counter + nb_frames) >= tx_rbuf.buf_len) {
log_info("ENCODE_THREAD", "TX Trace ready");
tx_trace_ready = 1;
pthread_exit(EXIT_SUCCESS);
}
#endif
if(g->zeroes) { if(g->zeroes) {
for(int j = 0; j < nb_frames; j++) { for(int j = 0; j < nb_frames; j++) {
memset(RBUF_WRITE0(tx_rbuf, uint8_t) + 8, 0x00, 240); memset(RBUF_WRITE0(tx_rbuf, uint8_t) + 8, 0x00, 240);
*((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++); *((uint16_t *) (RBUF_WRITE0(tx_rbuf, uint8_t) + 6)) = htons(seq_id++);
#ifdef TRACE
//write_buffer(&tx_trace_buffer, 14, (uint8_t*) RBUF_WRITE0(tx_rbuf, uint8_t), 248);
write_buffer(&tx_trace_buffer, 14, (uint8_t*) pkt_frame_full, 248);
#endif
rbuf_update_write_index(&tx_rbuf); rbuf_update_write_index(&tx_rbuf);
} }
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nb_frames) % trxw_rbuf[0].buf_len; trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nb_frames) % trxw_rbuf[0].buf_len;
...@@ -683,13 +635,9 @@ static void *encode_thread(void *p) { ...@@ -683,13 +635,9 @@ static void *encode_thread(void *p) {
for(int i = 0; i < nc; i++) { for(int i = 0; i < nc; i++) {
for(int i = 0; i < TX_N_CHANNEL ; i++) for(int i = 0; i < TX_N_CHANNEL ; i++)
encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]); encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]);
*((uint16_t *)(buf + 6)) = htons(seq_id++); *((uint16_t *)(buf - 2)) = htons(seq_id++);
for(int j = 0; j < TX_N_CHANNEL; j++) for(int j = 0; j < TX_N_CHANNEL; j++)
iq_samples[j] += trxw_rbuf[0].len; iq_samples[j] += trxw_rbuf[0].len;
#ifdef TRACE
//write_buffer(&tx_trace_buffer, 14, buf - 8, 248);
write_buffer(&tx_trace_buffer, 14, (uint8_t*) pkt_frame_empty, 248);
#endif
buf += tx_rbuf.len; buf += tx_rbuf.len;
} }
tx_rbuf.write_index = (tx_rbuf.write_index + nc) % tx_rbuf.buf_len; tx_rbuf.write_index = (tx_rbuf.write_index + nc) % tx_rbuf.buf_len;
...@@ -708,8 +656,8 @@ static void *encode_thread(void *p) { ...@@ -708,8 +656,8 @@ static void *encode_thread(void *p) {
} }
else { else {
// Send empty frames until we receive something // Send empty frames until we receive something
#if 1 #ifdef START_SENDING
if(!trx_started && !sync_complete) { if(!sync_complete) {
if(i == 0) if(i == 0)
clock_gettime(CLOCK_TAI, &next); clock_gettime(CLOCK_TAI, &next);
// Limit packets sent // Limit packets sent
...@@ -732,14 +680,13 @@ static void *encode_thread(void *p) { ...@@ -732,14 +680,13 @@ static void *encode_thread(void *p) {
} }
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
#endif
} }
static void *decode_thread(void *p) { static void *decode_thread(void *p) {
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
log_info("DECOMPRESS_THREAD", "Thread init"); log_info("DECODE_THREAD", "Thread init");
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
CPU_SET(s->decode_affinity, &mask); CPU_SET(s->decode_affinity, &mask);
...@@ -756,6 +703,14 @@ static void *decode_thread(void *p) { ...@@ -756,6 +703,14 @@ static void *decode_thread(void *p) {
uint8_t * buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.read_index * rx_rbuf.len) + 22; uint8_t * buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.read_index * rx_rbuf.len) + 22;
#ifdef TRACE
if((trxr_rbuf[0].write_index + nc) >= trxr_rbuf[0].buf_len) {
rx_trace_ready = 1;
log_info("DECODE_THREAD", "RX Trace ready");
pthread_exit(EXIT_SUCCESS);
}
#endif
Complex * iq_samples[4]; Complex * iq_samples[4];
for(int i = 0; i < RX_N_CHANNEL; i++) for(int i = 0; i < RX_N_CHANNEL; i++)
iq_samples[i] = (((Complex *) trxr_rbuf[i].buffer) + (trxr_rbuf[0].write_index * trxr_rbuf[0].len)); iq_samples[i] = (((Complex *) trxr_rbuf[i].buffer) + (trxr_rbuf[0].write_index * trxr_rbuf[0].len));
...@@ -777,6 +732,7 @@ static void *decode_thread(void *p) { ...@@ -777,6 +732,7 @@ static void *decode_thread(void *p) {
} }
static void *statistic_thread(void *p) { static void *statistic_thread(void *p) {
struct timespec next, initial; struct timespec next, initial;
int64_t recv_stop = 0;
cpu_set_t mask; cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
FILE * stats_file_desc; FILE * stats_file_desc;
...@@ -801,6 +757,9 @@ static void *statistic_thread(void *p) { ...@@ -801,6 +757,9 @@ static void *statistic_thread(void *p) {
next = initial; next = initial;
for(int64_t i = 0;; i++) { for(int64_t i = 0;; i++) {
add_ns(&next, STATISTIC_REFRESH_RATE); add_ns(&next, STATISTIC_REFRESH_RATE);
#ifdef TRACE
trace_handler(initial, s);
#endif
print_stats(stats_file_desc, (i % 50) == 0); print_stats(stats_file_desc, (i % 50) == 0);
#ifdef DEBUG #ifdef DEBUG
fprintf(stats_file_desc, fprintf(stats_file_desc,
...@@ -816,6 +775,31 @@ static void *statistic_thread(void *p) { ...@@ -816,6 +775,31 @@ static void *statistic_thread(void *p) {
fprintf(stats_file_desc, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0])); fprintf(stats_file_desc, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
#endif #endif
fflush(stats_file_desc); fflush(stats_file_desc);
update_counter_pps(&rx_drop_counter);
update_counter_pps(&tx_drop_counter);
update_counter_pps(&recv_counter);
update_counter_pps(&decode_counter);
update_counter_pps(&read_counter);
update_counter_pps(&write_counter);
update_counter_pps(&encode_counter);
update_counter_pps(&sent_counter);
#ifdef MONITOR
if(recv_counter.pps < 3000000) {
struct timespec _ts;
int64_t ts;
clock_gettime(CLOCK_MONOTONIC, &_ts);
ts = ts_to_int(_ts);
if(sync_happened && (recv_stop && ((ts - recv_stop) > RECV_STOP_THRESHOLD * INT64_C(1000000000)))) {
log_info("MONITOR", "Stopped recieving packets, sending again...");
sync_complete = 0;
recv_stop = 0;
}
if(!recv_stop)
recv_stop = ts;
}
#endif
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL); clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
} }
pthread_exit(EXIT_SUCCESS); pthread_exit(EXIT_SUCCESS);
...@@ -930,19 +914,10 @@ int startdpdk(TRXEcpriState * s) { ...@@ -930,19 +914,10 @@ int startdpdk(TRXEcpriState * s) {
int prev_space = -1; int prev_space = -1;
char ** argv; char ** argv;
for(int i = 0; i < 256; i++) { #ifndef DPDK
iq_frame_full[i * 4 + 0] = 0x00;
iq_frame_full[i * 4 + 1] = 0x00;
iq_frame_full[i * 4 + 2] = 0x00;
iq_frame_full[i * 4 + 3] = 0x3f;
}
for(int i = 0; i < 1024; i++)
iq_frame_empty[i] = 0x00;
for(int i = 0; i < 262; i++)
pkt_frame_empty[i] = 0x00;
for(int i = 0; i < 262; i++) for(int i = 0; i < 262; i++)
pkt_frame_full[i] = 0xff; pkt_frame_full[i] = 0xff;
#endif
for(int i = 0;; i++) { for(int i = 0;; i++) {
if(s->dpdk_options[i] == ' ') if(s->dpdk_options[i] == ' ')
...@@ -1026,7 +1001,7 @@ int startdpdk(TRXEcpriState * s) { ...@@ -1026,7 +1001,7 @@ int startdpdk(TRXEcpriState * s) {
*((uint16_t *) (ecpri_message + 2)) = htons(244); *((uint16_t *) (ecpri_message + 2)) = htons(244);
*((uint16_t *) (ecpri_message + 4)) = htons(s->flow_id); *((uint16_t *) (ecpri_message + 4)) = htons(s->flow_id);
for(int i = 0; i < TXRX_BUF_MAX_SIZE; i++) for(int i = 0; i < tx_rbuf.buf_len; i++)
memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_message, tx_rbuf.len); memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_message, tx_rbuf.len);
start_threads(s); start_threads(s);
...@@ -1054,7 +1029,7 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void ...@@ -1054,7 +1029,7 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count); log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count);
if(prev_count && (ts - prev_ts) != prev_count) { if(prev_count && ((ts - prev_ts) != prev_count)) {
log_exit("TRX_ECPRI_WRITE", "Gap between timestamps: prev_ts %li ts %li prev_count %li count %li diff_ts %li", prev_ts, ts, prev_count, count, (ts - prev_ts)); log_exit("TRX_ECPRI_WRITE", "Gap between timestamps: prev_ts %li ts %li prev_count %li count %li diff_ts %li", prev_ts, ts, prev_count, count, (ts - prev_ts));
} }
prev_ts = ts; prev_count = write_count; prev_ts = ts; prev_count = write_count;
...@@ -1065,6 +1040,14 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void ...@@ -1065,6 +1040,14 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
return; return;
} }
#ifdef TRACE
if((trxw_rbuf[0].write_index + write_count) >= trxw_rbuf[0].buf_len) {
log_info("TRX_ECPRI_WRITE", "TX Trace ready");
tx_trace_ready = 1;
pthread_exit(EXIT_SUCCESS);
}
#endif
if(first_trx_write) { if(first_trx_write) {
sample_group_t * g2 = RBUF_WRITE0(trxw_group_rbuf, sample_group_t); sample_group_t * g2 = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
g2->count = ts; g2->count = ts;
...@@ -1080,14 +1063,10 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void ...@@ -1080,14 +1063,10 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[0], write_count))) { while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[0], write_count))) {
if(__samples) if(__samples)
for(int i = 0; i < TX_N_CHANNEL; i++) for(int i = 0; i < TX_N_CHANNEL; i++)
memcpy((((float *) trxw_rbuf[i].buffer) + trxw_rbuf[0].write_index * trxw_rbuf[0].len * 2), _samples[i], nc * trxw_rbuf[0].len * 2); memcpy(((uint8_t *) trxw_rbuf[i].buffer) + trxw_rbuf[0].write_index * trxw_rbuf[0].len * sizeof(Complex), (uint8_t*) _samples[i], nc * trxw_rbuf[0].len * sizeof(Complex));
trxw_rbuf[0].write_index = (trxw_rbuf[0].write_index + nc) % trxw_rbuf[0].buf_len; trxw_rbuf[0].write_index = (trxw_rbuf[0].write_index + nc) % trxw_rbuf[0].buf_len;
write_count -= nc; write_count -= nc;
} }
#ifdef TRACE
for(int i = 0; i * M < count; i++)
write_buffer(&trxw_trace_buffer, 0, __samples ? ((uint8_t*) iq_frame_full) : ((uint8_t *)iq_frame_empty), 1024);
#endif
rbuf_update_write_index(&trxw_group_rbuf); rbuf_update_write_index(&trxw_group_rbuf);
update_counter(&write_counter, count / M); update_counter(&write_counter, count / M);
...@@ -1106,21 +1085,14 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa ...@@ -1106,21 +1085,14 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter); log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter);
sync_complete = 1; sync_complete = 1;
sync_happened = 1;
n = read_count; n = read_count;
while((nc = rbuf_contiguous_copy(&trxr_rbuf[0], NULL, n))) { while((nc = rbuf_contiguous_copy(&trxr_rbuf[0], NULL, n))) {
int len = nc * trxr_rbuf[0].len * 2; int len = nc * trxr_rbuf[0].len * sizeof(Complex);
for(int i = 0; i < RX_N_CHANNEL; i++ ) { for(int i = 0; i < RX_N_CHANNEL; i++ ) {
memcpy(_samples[i] + offset, (((float *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * 2), len); memcpy((uint8_t*) (_samples[i] + offset), ((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * sizeof(Complex), len);
}
#ifdef TRACE
for(int i = 0; i < nc; i++) {
int l = 64 * sizeof(float);
for(int j = 0; j < RX_N_CHANNEL; j++ ) {
write_buffer(&trxr_trace_buffer, 0, (uint8_t*) (_samples[j] + offset + i * 64), l);
}
} }
#endif
trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len; trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len;
n -= nc; n -= nc;
offset += len; offset += len;
...@@ -1160,13 +1132,6 @@ static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params) ...@@ -1160,13 +1132,6 @@ static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params)
void dummy_enb_init(TRXState *s1, TRXEcpriState *s) { void dummy_enb_init(TRXState *s1, TRXEcpriState *s) {
#ifdef TRACE
init_buffer(&rx_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&tx_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&trxw_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&trxr_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
#endif
s1->trx_write_func2 = trx_ecpri_write; s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read; s1->trx_read_func2 = trx_ecpri_read;
startdpdk(s); startdpdk(s);
...@@ -1193,13 +1158,6 @@ int trx_driver_init(TRXState *s1) ...@@ -1193,13 +1158,6 @@ int trx_driver_init(TRXState *s1)
s = malloc(sizeof(TRXEcpriState)); s = malloc(sizeof(TRXEcpriState));
memset(s, 0, sizeof(*s)); memset(s, 0, sizeof(*s));
#ifdef TRACE
init_buffer(&rx_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&tx_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&trxw_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
init_buffer(&trxr_trace_buffer, TRACE_BUFFER_SIZE_MB * 1000000);
#endif
trx_get_param_double(s1, &val, "recv_affinity"); trx_get_param_double(s1, &val, "recv_affinity");
s->recv_affinity = (int) val; s->recv_affinity = (int) val;
trx_get_param_double(s1, &val, "send_affinity"); trx_get_param_double(s1, &val, "send_affinity");
......
...@@ -36,7 +36,7 @@ static inline void log_limit(const char * section, const char * msg, ...) { ...@@ -36,7 +36,7 @@ static inline void log_limit(const char * section, const char * msg, ...) {
puts(line); puts(line);
} }
#if 0 #if 1
static void log_info(const char * section, const char * msg, ...) { static void log_info(const char * section, const char * msg, ...) {
time_t t; time_t t;
struct tm ts; struct tm ts;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment