#include <arpa/inet.h> #include <assert.h> #include <errno.h> #include <error.h> #include <fcntl.h> #include <getopt.h> #include <immintrin.h> #include <inttypes.h> #include <limits.h> #include <linux/if_packet.h> #include <math.h> #include <netdb.h> #include <netinet/ether.h> #include <netinet/in.h> #include <net/if.h> #include <pthread.h> #include <sched.h> #include <semaphore.h> #include <signal.h> #include <stdarg.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/ioctl.h> #include <sys/mman.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/time.h> #include <sys/types.h> #include <time.h> #include <unistd.h> #define DPDK #ifdef DPDK #include <rte_eal.h> #include <rte_ethdev.h> #include <rte_ether.h> #include <rte_cycles.h> #include <rte_lcore.h> #include <rte_mbuf.h> #include <rte_ether.h> #include <rte_ip.h> #include <rte_udp.h> #endif #include "private/trx_driver.h" #include "utils.c" //#define DEBUG // Enables / deactivates log_debug //#define STAT_SHOW_COUNTERS #define PPS_UPDATE_PERIOD INT64_C(1500000000) #define MAX_PACKET_SIZE 1024 #define N_SAMPLES (32) #define TRX_MAX_GROUP 1500 #define STAT_INT_LEN "8" //#define DST_ADDR_SYNTAX // Depends on DPDK version typedef struct { float re; float im; } Complex; typedef struct { // Log const uint8_t * log_directory; // Network const uint8_t * re_mac; const uint8_t * rec_mac; const uint8_t * rec_if; const char * dpdk_options; // RF int rx_n_channel; int tx_n_channel; int frame_frequency; int tdd_period; int sample_rate; // Perfomance / RT int recv_affinity; int send_affinity; int encode_affinity; int decode_affinity; int statistic_affinity; int ecpri_period; int trx_buf_size; int txrx_buf_size; int encode_burst; int send_burst; int statistics_refresh_rate_ns; // Trace / Monitor int trace_rx; int trace_tx; int trace_offset; int monitor_pps; int monitor_trigger_duration; // eCPRI int flow_id; int start_receiving; int master; int trx_read_null; int one_way_measure; int one_way_period; int tdd_frame_start; int timestamp_frames; int master_timestamp_check_period; int slave_timestamp_check_period; int disable_rx_copy; } TRXEcpriState; typedef struct { int64_t counter; int64_t pps_counter; int64_t pps_ts; int64_t pps; } counter_stat_t; typedef struct { volatile void * buffer; char name[64]; int buf_len; int len; volatile int write_index; volatile int read_index; volatile int write_ahead; } ring_buffer_t; typedef struct { int64_t count; uint8_t wait; uint8_t zeroes; } sample_group_t; /* Proprietary code: - compression / decompression of IQ samples - fast conversion between int16_t and float */ #include "private/bf1_avx2.c" //#include "private/bf1_avx2_nop.c" // Buffers static ring_buffer_t rx_rbuf; // Received packets static ring_buffer_t trxr_rbuf[4]; // Decoded IQ samples static ring_buffer_t tx_rbuf; // Packets to send static ring_buffer_t trxw_rbuf[4]; // Uncompressed IQ samples static ring_buffer_t trxw_group_rbuf; // Group of IQ samples static ring_buffer_t one_way_rbuf; // One way delay measurements // Counters static volatile counter_stat_t recv_counter; // frames received from eRE static volatile counter_stat_t decode_counter; // decoded frames static volatile counter_stat_t read_counter; // frames passed to amarisoft stack static volatile counter_stat_t write_counter; // samples to write from TRX static volatile counter_stat_t encode_counter; // encoded frames static volatile counter_stat_t trx_encode_counter; // encoded frames static volatile counter_stat_t sent_counter; // frames sent to eRE static volatile counter_stat_t rx_drop_counter; // frames sent to eRE static volatile counter_stat_t tx_drop_counter; // frames sent to eRE static volatile counter_stat_t empty_encode_counter; // frames sent to eRE static volatile int sync_complete = 0; static volatile int received_pkts = 0; static volatile int recv_pps_threshold_hit = 0; static int first_trx_write = 1; #ifndef DPDK static uint8_t pkt_frame_full[1024]; #endif static volatile int rx_trace_ready = 0; static volatile int tx_trace_ready = 0; static int64_t encode_counter_prev = 0; static int64_t decode_counter_prev = 0; static int recv_pps_threshold; //DEBUG static volatile int min_count_trx = 1000000; // Network static volatile int tx_seq_id; static volatile int rx_seq_id; static volatile uint64_t one_way_timestamp; static volatile int64_t rxtx_shift; static volatile int64_t slave_ts_offset; static int64_t tx_initial_ts; static int64_t sync_encode_counter; static void rbuf_update_write_index(ring_buffer_t * rbuf) { rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len; } static void rbuf_update_read_index(ring_buffer_t * rbuf) { rbuf->read_index = (rbuf->read_index + 1) % rbuf->buf_len; } static int rbuf_read_amount(ring_buffer_t * rbuf) { return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len; } static int rbuf_write_amount(ring_buffer_t * rbuf) { // Don't write everything to avoid write index catching up to read index // That we way we don't have to use locks return ((rbuf->read_index + rbuf->buf_len - rbuf->write_index - 1) % rbuf->buf_len); } static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, int n) { int ret = n; if(rbuf1) { n = rbuf1->buf_len - rbuf1->read_index; ret = n < ret ? n : ret; } if(rbuf2) n = rbuf2->buf_len - rbuf2->write_index; return n < ret ? n : ret; } #define RBUF_READ0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len)) #define RBUF_WRITE0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len)) #define RBUF_READ(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.read_index + i) % rbuf.buf_len) * rbuf.len)) #define RBUF_WRITE(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.write_index + i) % rbuf.buf_len) * rbuf.len)) #define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\ {\ log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (((int) _buf_len) * ((int) _len) * sizeof(type)));\ rbuf.buffer = (type *) malloc(((int) _buf_len) * ((int) _len) * sizeof(type));\ strcpy(rbuf.name, _name);\ rbuf.buf_len = _buf_len;\ rbuf.len = _len;\ rbuf.write_index = 0;\ rbuf.read_index = 0;\ rbuf.write_ahead = 0;\ } while(0) static void print_stats(FILE * f, int print_header) { if(print_header) { fprintf(f, "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " #ifdef STAT_SHOW_COUNTERS "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " #endif "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "\n", "RX", "TX", #ifdef STAT_SHOW_COUNTERS "RECEIVED", "DECODE", "READ", "WRITE", "ENCODE", "SENT", "EMPTY TX", #endif "RECEIVED", "DECODE", "READ", "WRITE", "ENCODE", "SENT", "EMPTY TX", "RX", "TRXR", "TRXW", "TX", "SLAVE", "MASTER"); fprintf(f, "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " #ifdef STAT_SHOW_COUNTERS "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " #endif "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s " "\n", "DROPPED", "DROPPED", #ifdef STAT_SHOW_COUNTERS "COUNTER", "COUNTER", "COUNTER", "COUNTER", "COUNTER", "COUNTER", "COUNTER", #endif "PPS", "PPS", "PPS", "PPS", "PPS", "PPS", "PPS", "DELAY", "DELAY", "DELAY", "DELAY", "", ""); } fprintf(f, "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " #ifdef STAT_SHOW_COUNTERS "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " #endif "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "d " "%-" STAT_INT_LEN "d " "%-" STAT_INT_LEN "d " "%-" STAT_INT_LEN "d " "%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " " "\n", rx_drop_counter.counter, tx_drop_counter.counter, #ifdef STAT_SHOW_COUNTERS recv_counter.counter, decode_counter.counter, read_counter.counter, write_counter.counter, encode_counter.counter, sent_counter.counter, empty_encode_counter.counter, #endif recv_counter.pps, decode_counter.pps, read_counter.pps, write_counter.pps, encode_counter.pps, sent_counter.pps, empty_encode_counter.pps, rbuf_read_amount(&rx_rbuf), rbuf_read_amount(&trxr_rbuf[0]), rbuf_read_amount(&trxw_rbuf[0]), rbuf_read_amount(&tx_rbuf), rxtx_shift, slave_ts_offset); } static void log_exit(const char * section, const char * msg, ...) { time_t t; struct tm ts; char line[256]; va_list arglist; time(&t); ts = *localtime(&t); strftime(line, 80, "%m-%d %H:%M:%S", &ts); sprintf(line + strlen(line), " EXIT [%s] ", section); va_start(arglist, msg); vsprintf(line + strlen(line), msg, arglist); va_end(arglist); fprintf(stderr, "%s\n", line); // Dump useful information print_stats(stderr, 1); fprintf(stderr, "TX RBUF: ri %d wi %d ra %d wa %d\n", tx_rbuf.read_index, tx_rbuf.write_index, rbuf_read_amount(&tx_rbuf), rbuf_write_amount(&tx_rbuf)); fprintf(stderr, "RX RBUF: ri %d wi %d ra %d wa %d\n", rx_rbuf.read_index, rx_rbuf.write_index, rbuf_read_amount(&rx_rbuf), rbuf_write_amount(&rx_rbuf)); fprintf(stderr, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0])); fprintf(stderr, "TRXR RBUF: ri %d wi %d ra %d wa %d\n", trxr_rbuf[0].read_index, trxr_rbuf[0].write_index, rbuf_read_amount(&trxr_rbuf[0]), rbuf_write_amount(&trxr_rbuf[0])); fprintf(stderr, "TRXW GROUP RBUF: ri %d wi %d ra %d wa %d\n", trxw_group_rbuf.read_index, trxw_group_rbuf.write_index, rbuf_read_amount(&trxw_group_rbuf), rbuf_write_amount(&trxw_group_rbuf)); fflush(stdout); fflush(stderr); exit(EXIT_FAILURE); } #define BURST_SIZE 16 #define TX_POOL_SIZE 16 #ifdef DPDK #include "dpdk.c" /* DPDK */ #else struct rte_mbuf { int buf_addr; int data_off; }; void rte_pktmbuf_free(void * pkt) { (void) pkt; //for(int i = 0; i < 1000; i ++) // asm("NOP"); } #endif static void init_counter(volatile counter_stat_t * c) { c->counter = 0; c->pps_counter = 0; c->pps_ts = 0; c->pps = 0; } static void update_counter_pps(volatile counter_stat_t * c) { struct timespec _ts; int64_t ts; clock_gettime(CLOCK_TAI, &_ts); ts = ts_to_int(_ts); if((ts - c->pps_ts) > PPS_UPDATE_PERIOD) { if(c->pps_ts) c->pps = ((c->counter - c->pps_counter) * NSEC_PER_SEC) / (ts - c->pps_ts); c->pps_counter = c->counter; c->pps_ts = ts; } } static void update_counter(volatile counter_stat_t * c, int64_t v) { c->counter += v; } #define GETTIME_MAX_C INT64_C(7000000000) static int64_t gettime(int64_t initial_ts, int64_t counter, int64_t freq) { int64_t ret = initial_ts; for(; counter >= GETTIME_MAX_C; counter -= GETTIME_MAX_C) ret += GETTIME_MAX_C * NSEC_PER_SEC / freq; ret += (counter * NSEC_PER_SEC / freq); return ret; } static void trace_handler(struct timespec initial, TRXEcpriState * s) { struct timespec next; int ready = 1; if(s->trace_tx) ready &= tx_trace_ready; if(s->trace_rx) ready &= rx_trace_ready; if(ready) { int64_t d; FILE * f; char n[256]; int start; uint8_t ones[14]; for(int i = 0; i < 14; i++) ones[i] = 0xff; clock_gettime(CLOCK_TAI, &next); d = calcdiff_ns(next, initial); log_info("TRACE", "Packets sent: %" PRIi64, sent_counter.counter); log_info("TRACE", "Duration: %" PRIi64, d); usleep(1000 * 200); memset(n, '\0', 256); sprintf(n, "%s/tx.trace", s->log_directory); f = fopen(n, "wb+"); start = (s->trace_offset + encode_counter_prev) % tx_rbuf.buf_len; log_info("TRACE", "Writing %d frames to tx.trace", (tx_rbuf.write_index + tx_rbuf.buf_len - start) % tx_rbuf.buf_len); for(int i = start; i != tx_rbuf.write_index; i = (i + 1) % tx_rbuf.buf_len) { fwrite(ones, 14, 1, f); fwrite(((uint8_t*) tx_rbuf.buffer) + i * tx_rbuf.len, tx_rbuf.len, 1, f); } fclose(f); memset(n, '\0', 256); sprintf(n, "%s/trxw.trace", s->log_directory); f = fopen(n, "wb+"); start = (s->trace_offset) % trxw_rbuf[0].buf_len; log_info("TRACE", "Writing %d frames to trxw.trace", (trxw_rbuf[0].write_index + trxw_rbuf[0].buf_len - start) % trxw_rbuf[0].buf_len); for(int i = start; i != trxw_rbuf[0].write_index; i = (i + 1) % trxw_rbuf[0].buf_len) { for(int j = 0; j < s->tx_n_channel; j++) fwrite((uint8_t *) (((Complex *) trxw_rbuf[j].buffer) + i * trxw_rbuf[0].len), trxw_rbuf[0].len * sizeof(Complex), 1, f); } fclose(f); memset(n, '\0', 256); sprintf(n, "%s/rx.trace", s->log_directory); f = fopen(n, "wb+"); start = s->trace_offset % rx_rbuf.buf_len; log_info("TRACE", "Writing %d frames to rx.trace", (rx_rbuf.write_index + rx_rbuf.buf_len - start) % rx_rbuf.buf_len); for(int i = start; i != rx_rbuf.write_index; i = (i + 1) % rx_rbuf.buf_len) { fwrite(((uint8_t*) rx_rbuf.buffer) + i * rx_rbuf.len, rx_rbuf.len, 1, f); } fclose(f); memset(n, '\0', 256); sprintf(n, "%s/trxr.trace", s->log_directory); f = fopen(n, "wb+"); start = (s->trace_offset + decode_counter_prev) % trxr_rbuf[0].buf_len; log_info("TRACE", "Writing %d frames to trxr.trace", (trxr_rbuf[0].write_index + trxr_rbuf[0].buf_len - start) % trxr_rbuf[0].buf_len); for(int i = start; i != trxr_rbuf[0].write_index; i = (i + 1) % trxr_rbuf[0].buf_len) { for(int j = 0; j < s->rx_n_channel; j++) fwrite((uint8_t *) (((Complex *) trxr_rbuf[j].buffer) + i * trxr_rbuf[0].len), trxr_rbuf[0].len * sizeof(Complex), 1, f); } fclose(f); log_exit("", "Finished tracing"); } } static void *recv_thread(void *p) { cpu_set_t mask; TRXEcpriState * s = (TRXEcpriState *) p; #ifdef DPDK int first_seq_id = 1; #else int64_t target_counter = 0; struct timespec current, previous; #endif log_info("RECV_THREAD", "Thread init"); // Set thread CPU affinity CPU_ZERO(&mask); CPU_SET(s->recv_affinity, &mask); if (sched_setaffinity(0, sizeof(mask), &mask)) error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity); #define RTE_MBUF_SIZE 20000 #define MIN_RX 10000 for(int64_t i = 0;; i++) { struct rte_mbuf * pkt[RTE_MBUF_SIZE]; uint8_t * buf; #ifdef DPDK uint8_t * rtebuf; int port = 0; #endif int nb_rx = 0; int n; int drop_packet = 0; int drop_total = 0; #ifdef DPDK while(!nb_rx) nb_rx = rte_eth_rx_burst(port, 0, pkt + nb_rx, 1024); #else // Limit packets sent if(recv_counter.counter >= target_counter) { clock_gettime(CLOCK_TAI, ¤t); if(!i || calcdiff_ns(current, previous) >= (1000 * 1000 * 10)) { target_counter += s->frame_frequency / 100; previous = current; } } if(recv_counter.counter < target_counter) { nb_rx = 1024; usleep(200); } else continue; #endif if(nb_rx > RTE_MBUF_SIZE) log_exit("RECV_THREAD", "nb_rx (%d) > RTE_MBUF_SIZE (%d)", nb_rx, RTE_MBUF_SIZE); received_pkts = 1; n = rbuf_write_amount(&rx_rbuf); drop_packet = nb_rx > n; if(drop_packet) { for(int i = 0; i < nb_rx; i++) rte_pktmbuf_free(pkt[i]); if(nb_rx) update_counter(&rx_drop_counter, nb_rx); } else { int nc; int nr; int k = 0; nr = nb_rx; while((nc = rbuf_contiguous_copy(NULL, &rx_rbuf, nr))) { int drop = 0; if(s->trace_rx) { if((recv_counter.counter + nc) >= (rx_rbuf.buf_len + s->trace_offset)) { rx_trace_ready = 1; log_info("RECV_THREAD", "RX Trace ready"); pthread_exit(EXIT_SUCCESS); } else if (rx_trace_ready) { pthread_exit(EXIT_SUCCESS); } } buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len); for(int i = 0; i < nc; i++) { #ifdef DPDK rtebuf = (uint8_t *) (pkt[i + k])->buf_addr + (pkt[i + k])->data_off; if(first_seq_id) { uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]); printf("seq_id = %d\n", seq_id); first_seq_id = 0; } if(s->master == 1) { uint16_t _rx_seq_id; uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]); rx_seq_id++; _rx_seq_id = rx_seq_id % 65536; if(_rx_seq_id != seq_id) { if(_rx_seq_id > seq_id) rx_seq_id += 65536 + seq_id - _rx_seq_id; else rx_seq_id += seq_id - _rx_seq_id; } } if((pkt[i + k])->data_len != rx_rbuf.len) { for(int j = 0; j < pkt[i + k]->data_len; j++) { printf("%02x", rtebuf[j]); } printf("\n"); log_error("RECV", "Packet data length (%d) != RX buffer len (%d)", pkt[i + k]->data_len, rx_rbuf.len); } if(ntohl(*((uint32_t*) (rtebuf + 12))) != 0xaefe) { drop++; drop_total++; continue; } memcpy(buf + i * rx_rbuf.len, rtebuf, rx_rbuf.len); #else //memcpy(buf + i * rx_rbuf.len, pkt_frame_full, rx_rbuf.len); #endif } rx_rbuf.write_index = (rx_rbuf.write_index + nc - drop) % rx_rbuf.buf_len; for(int i = 0; i < nc; i++) rte_pktmbuf_free(pkt[i + k]); nr -= nc; k += nc; } } update_counter(&recv_counter, nb_rx - drop_total); } pthread_exit(EXIT_SUCCESS); } // Send as soon as packets are encoded // Signal to encode thread that packets has been sent static void *send_thread(void *p) { cpu_set_t mask; struct timespec initial; TRXEcpriState * s = (TRXEcpriState *) p; log_info("SEND_THREAD", "Thread init"); // Set thread CPU affinity CPU_ZERO(&mask); CPU_SET(s->send_affinity, &mask); if (sched_setaffinity(0, sizeof(mask), &mask)) error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity); clock_gettime(CLOCK_TAI, &initial); for(int64_t i = 1;; i++) { int64_t n = rbuf_read_amount(&tx_rbuf); if(n >= BURST_SIZE) { int nb_burst = n / BURST_SIZE; for(int j = 0; j < nb_burst; j++) { #ifdef DPDK struct rte_mbuf * pkt[TX_POOL_SIZE]; struct rte_ether_hdr *eth_hdr; uint16_t nb_tx = 0; for(int i = 0; i < TX_POOL_SIZE; i++) { int pkt_size; pkt[i] = rte_pktmbuf_alloc(tx_mbuf_pool); eth_hdr = rte_pktmbuf_mtod(pkt[i], struct rte_ether_hdr*); eth_hdr->d_addr = d_addr; eth_hdr->s_addr = s_addr; eth_hdr->ether_type = htons(0xaefe); memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), RBUF_READ0(tx_rbuf, uint8_t), tx_rbuf.len); rbuf_update_read_index(&tx_rbuf); pkt_size = 14 + tx_rbuf.len; pkt[i]->data_len = pkt_size; pkt[i]->pkt_len = pkt_size; } while(nb_tx < TX_POOL_SIZE) { int64_t x = TX_POOL_SIZE - nb_tx; nb_tx += rte_eth_tx_burst(0, 0, pkt + nb_tx, x > BURST_SIZE ? BURST_SIZE : x); } /* Free any unsent packets. */ if (nb_tx < BURST_SIZE) { uint16_t buf; for (buf = nb_tx; buf < BURST_SIZE; buf++) rte_pktmbuf_free(pkt[buf]); log_exit("SEND_THREAD", "Sent %d packets instead of %d", nb_tx, BURST_SIZE); } #else for(int i = 0; i < 3000; i++) asm("NOP"); #endif } update_counter(&sent_counter, nb_burst * BURST_SIZE); } } pthread_exit(EXIT_SUCCESS); } static int one_way_delay_measure(uint8_t ** buf) { struct timespec t; clock_gettime(CLOCK_TAI, &t); one_way_timestamp = ts_to_int(t); *( (*buf - 7)) = 3; // eCPRI Type *( (*buf - 4)) = 0; // Measurement ID *( (*buf - 3)) = 0; // Action Type *((uint64_t *)(*buf - 2)) = one_way_timestamp; // Timestamp *((uint64_t *)(*buf + 7)) = 0; // Compensation *buf += tx_rbuf.len; return 1; } static void encode_empty_frames(int n, int trx, TRXEcpriState * s) { uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8; int n2 = n; for(int i = 0; i < n; i++) { // ONE WAY DELAY MEASURE if(s->one_way_measure && (encode_counter.counter + i) % s->one_way_period) n2 += one_way_delay_measure(&buf); // TDD FRAME START if(s->tdd_frame_start) *((uint8_t *)(buf + 60 * s->tx_n_channel)) = (trx && !((trx_encode_counter.counter + i) % s->tdd_period)); // TIMESTAMP if(s->timestamp_frames) { if(s->master) *((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(0, sync_encode_counter++ + rxtx_shift, s->frame_frequency); else *((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(tx_initial_ts, sync_encode_counter++, s->frame_frequency); } memset(buf, 0x00, 60 * s->tx_n_channel); *((uint16_t *)(buf - 2)) = htons(tx_seq_id++); buf += tx_rbuf.len; } tx_rbuf.write_index = (tx_rbuf.write_index + n2) % tx_rbuf.buf_len; if(trx) trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + n) % trxw_rbuf[0].buf_len; } static void encode_iq_samples(int n, TRXEcpriState * s) { int nc; int nf = n; while((nc = rbuf_contiguous_copy(&trxw_rbuf[0], &tx_rbuf, nf))) { Complex * iq_samples[4]; uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8; int nc2 = nc; for(int j = 0; j < s->tx_n_channel; j++) iq_samples[j] = ((Complex *) trxw_rbuf[j].buffer) + (trxw_rbuf[0].read_index * trxw_rbuf[0].len); for(int i = 0; i < nc; i++) { // ONE WAY DELAY MEASURE if(s->one_way_measure && (encode_counter.counter + i) % s->one_way_period) nc2 += one_way_delay_measure(&buf); // TDD FRAME START if(s->tdd_frame_start) *((uint8_t *)(buf + 60 * s->tx_n_channel)) = !((trx_encode_counter.counter + i) % s->tdd_period); // TIMESTAMP if(s->timestamp_frames) { if(s->master) *((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(0, sync_encode_counter++ + rxtx_shift, s->frame_frequency); else *((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(tx_initial_ts, sync_encode_counter++, s->frame_frequency); } for(int i = 0; i < s->tx_n_channel ; i++) encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]); *((uint16_t *)(buf - 2)) = htons(tx_seq_id++); for(int j = 0; j < s->tx_n_channel; j++) iq_samples[j] += trxw_rbuf[0].len; buf += tx_rbuf.len; } tx_rbuf.write_index = (tx_rbuf.write_index + nc2) % tx_rbuf.buf_len; trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nc) % trxw_rbuf[0].buf_len; nf -= nc; } if(nf) exit(EXIT_FAILURE); } int encode_trx(int n_max, TRXEcpriState * s) { int remain = n_max; while(remain && rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) { sample_group_t * g; int n_trx; g = RBUF_READ0(trxw_group_rbuf, sample_group_t); if(g->wait) { g->wait = 0; g->count -= trx_encode_counter.counter; g->zeroes = 1; } n_trx = g->count > remain ? remain : g->count; g->count -= n_trx; if(s->trace_tx) { if(sync_complete && (encode_counter.counter + n_trx) >= (tx_rbuf.buf_len + s->trace_offset)) { tx_trace_ready = 1; log_info("ENCODE_THREAD", "TX Trace ready"); pthread_exit(EXIT_SUCCESS); } else if (tx_trace_ready) { pthread_exit(EXIT_SUCCESS); } } if(g->zeroes) encode_empty_frames(n_trx, 1, s); else encode_iq_samples(n_trx, s); if(!g->count) { rbuf_update_read_index(&trxw_group_rbuf); } update_counter(&trx_encode_counter, n_trx); remain -= n_trx; } return (n_max - remain); } /* If sync has happenned (=we have received frames): Prepare as soon as TRX has packet to write Signal Else: Prepare as soon as there is space in tx buffer */ #define TX_SYNC_BURST_SIZE 512 #define MASTER_BURST 1024 static void *encode_thread(void *p) { cpu_set_t mask; TRXEcpriState * s = (TRXEcpriState *) p; int64_t target_counter = 0; struct timespec next; // Set thread CPU affinity CPU_ZERO(&mask); CPU_SET(s->encode_affinity, &mask); if (sched_setaffinity(0, sizeof(mask), &mask)) error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity); // Slave mode: first TS uses clock_gettime, then add fixed amount to timestamp // Master mode: Computes TX RX offset using received TS to set the TS // At each beginning of TDD frame (or less often), send empty frames to align with RX for(int64_t i = 0;; i++) { int n, n_trx; int n_empty = 0; int n_min, n_max; if(s->master && !i) { clock_gettime(CLOCK_TAI, &next); tx_initial_ts = ts_to_int(next); add_ns(&next, (((int64_t) MASTER_BURST) * NSEC_PER_SEC) / s->frame_frequency); } n = rbuf_write_amount(&tx_rbuf); n_min = s->encode_burst; n_min += s->one_way_measure ? (s->encode_burst / s->one_way_period) + 1 : 0; if(s->master && n < n_min) log_exit("ENCODE_THREAD", "Not enough space in TX RBUF (%d < %d)\n", n, s->encode_burst); n_max = s->encode_burst ? s->encode_burst : n; n_trx = encode_trx(n_max, s); if(s->master) { struct timespec current; n_empty = s->encode_burst - n_trx; encode_empty_frames(n_empty, 0, s); target_counter += s->encode_burst; next = int_to_ts(gettime(tx_initial_ts, target_counter, s->frame_frequency)); do { clock_gettime(CLOCK_TAI, ¤t); } while(current.tv_sec < next.tv_sec || (current.tv_sec == next.tv_sec && current.tv_nsec < next.tv_nsec)); } update_counter(&encode_counter, n_trx + n_empty); update_counter(&empty_encode_counter, n_empty); } pthread_exit(EXIT_SUCCESS); } static void *decode_thread(void *p) { cpu_set_t mask; TRXEcpriState * s = (TRXEcpriState *) p; struct timespec next; int64_t target_counter = 0; int64_t sync_decode_counter = 0; int reset_decode_counter = 1; log_info("DECODE_THREAD", "Thread init"); // Set thread CPU affinity CPU_ZERO(&mask); CPU_SET(s->decode_affinity, &mask); if (sched_setaffinity(0, sizeof(mask), &mask)) error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decode_affinity); for(int64_t i = 0;; i++) { int n, nc; if(s->start_receiving) { if(!received_pkts) { if(i == 0) clock_gettime(CLOCK_TAI, &next); // Limit packets sent if(decode_counter.counter > target_counter) { int k = (decode_counter.counter - target_counter + (s->frame_frequency / 100) - 1) / (s->frame_frequency / 100); add_ns(&next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL); target_counter += k * (s->frame_frequency / 100); } n = (s->frame_frequency / 100); for(int j = 0; j < n; j++) rbuf_update_write_index(&trxr_rbuf[0]); update_counter(&decode_counter, n); continue; } else if (reset_decode_counter) { if(s->trace_rx) decode_counter_prev = decode_counter.counter; decode_counter.counter = 0; reset_decode_counter = 0; } } while(!(n = rbuf_read_amount(&rx_rbuf))) {}; while(rbuf_write_amount(&trxr_rbuf[0]) < n) {}; while((nc = rbuf_contiguous_copy(&rx_rbuf, &trxr_rbuf[0], n))) { uint8_t * buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.read_index * rx_rbuf.len) + 22; int one_way_count = 0; if(s->trace_rx) { if(received_pkts && ((decode_counter.counter + nc) >= (trxr_rbuf[0].buf_len + s->trace_offset))) { rx_trace_ready = 1; log_info("DECODE_THREAD", "RX Trace ready"); pthread_exit(EXIT_SUCCESS); } else if (rx_trace_ready) { pthread_exit(EXIT_SUCCESS); } } Complex * iq_samples[4]; for(int i = 0; i < s->rx_n_channel; i++) iq_samples[i] = (((Complex *) trxr_rbuf[i].buffer) + (trxr_rbuf[0].write_index * trxr_rbuf[0].len)); for(int i = 0; i < nc; i++) { // ONE WAY MEASURE if(s->one_way_measure && *((buf + i * rx_rbuf.len) - 7) == 3) { uint64_t timestamp = (uint64_t) *(buf + 7); *(RBUF_WRITE0(one_way_rbuf, int64_t)) = ((int64_t) timestamp) - ((int64_t) one_way_timestamp); rbuf_update_write_index(&one_way_rbuf); continue; } // TIMESTAMP if(s->master && s->timestamp_frames && !(sync_decode_counter % s->master_timestamp_check_period)) { int64_t timestamp = (int64_t) *(buf + s->rx_n_channel * 60); rxtx_shift = gettime(0, timestamp, s->frame_frequency) - sync_decode_counter; } if(!s->master && s->timestamp_frames && !(sync_decode_counter % s->slave_timestamp_check_period)) { struct timespec t; int64_t timestamp = (int64_t) *(buf + s->rx_n_channel * 60); clock_gettime(CLOCK_TAI, &t); slave_ts_offset = ts_to_int(t) - timestamp; } for(int j = 0; j < s->rx_n_channel ; j++) { decode_s64_b60_2((float *) (iq_samples[j] + i * 32), buf + j * 60 + i * rx_rbuf.len); } sync_decode_counter++; } trxr_rbuf[0].write_index = (trxr_rbuf[0].write_index + nc) % trxr_rbuf[0].buf_len; rx_rbuf.read_index = (rx_rbuf.read_index + nc) % rx_rbuf.buf_len; n -= nc; update_counter(&decode_counter, nc); } } pthread_exit(EXIT_SUCCESS); } static void *statistic_thread(void *p) { struct timespec next, initial; int64_t recv_stop = 0; cpu_set_t mask; TRXEcpriState * s = (TRXEcpriState *) p; FILE * stats_file_desc; log_info("STATISTIC_THREAD", "Thread init"); char stats_file_name[256]; memset(stats_file_name, '\0', 256); sprintf(stats_file_name, "%s/ecpri.stats", s->log_directory); stats_file_desc = fopen(stats_file_name, "w+"); if(!stats_file_desc) error(EXIT_FAILURE, errno, "Couldn't open %s\n", stats_file_name); // Set thread CPU affinity CPU_ZERO(&mask); CPU_SET(s->statistic_affinity, &mask); if (sched_setaffinity(0, sizeof(mask), &mask)) error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->statistic_affinity); clock_gettime(CLOCK_TAI, &initial); next = initial; for(int64_t i = 0;; i++) { add_ns(&next, s->statistics_refresh_rate_ns); if(s->trace_rx || s->trace_tx) trace_handler(initial, s); print_stats(stats_file_desc, (i % 50) == 0); #ifdef DEBUG fprintf(stats_file_desc, "%d %d %d %d %d %d %d %d\n", rx_rbuf.write_index, rx_rbuf.read_index, trxr_rbuf[0].write_index, trxr_rbuf[0].read_index, trxw_rbuf[0].write_index, trxw_rbuf[0].read_index, tx_rbuf.write_index, tx_rbuf.read_index); fprintf(stats_file_desc, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0])); #endif fflush(stats_file_desc); update_counter_pps(&rx_drop_counter); update_counter_pps(&tx_drop_counter); update_counter_pps(&recv_counter); update_counter_pps(&decode_counter); update_counter_pps(&read_counter); update_counter_pps(&write_counter); update_counter_pps(&encode_counter); update_counter_pps(&trx_encode_counter); update_counter_pps(&sent_counter); update_counter_pps(&empty_encode_counter); if(s->monitor_pps) { if(recv_counter.pps > recv_pps_threshold) { recv_pps_threshold_hit = 1; } if(recv_pps_threshold_hit && recv_counter.pps < recv_pps_threshold) { struct timespec _ts; int64_t ts; clock_gettime(CLOCK_MONOTONIC, &_ts); ts = ts_to_int(_ts); if((recv_stop && ((ts - recv_stop) > (s->monitor_trigger_duration) * INT64_C(1000000000)))) { if(s->monitor_pps == 1) log_exit("MONITOR", "Stopped recieving packets, restarting..."); log_info("MONITOR", "Stopped recieving packets, sending again..."); sync_complete = 0; recv_stop = 0; } if(!recv_stop) recv_stop = ts; } } clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL); } pthread_exit(EXIT_SUCCESS); } static int start_threads(TRXEcpriState * s) { pthread_t recv_pthread; pthread_t send_pthread; pthread_t encode_pthread; pthread_t decode_pthread; pthread_t statistic_pthread; struct sched_param recv_param; struct sched_param send_param; struct sched_param encode_param; struct sched_param decode_param; struct sched_param statistic_param; pthread_attr_t recv_attr; pthread_attr_t send_attr; pthread_attr_t encode_attr; pthread_attr_t decode_attr; pthread_attr_t statistic_attr; log_info("TRX_ECPRI", "Starting threads"); // Initialize pthread attributes (default values) if (pthread_attr_init(&recv_attr)) log_error("TRX_ECPRI", "init pthread attributes failed\n"); // Set a specific stack size if (pthread_attr_setstacksize(&recv_attr, PTHREAD_STACK_MIN)) log_error("TRX_ECPRI", "pthread setstacksize failed\n"); // Set scheduler policy and priority of pthread if (pthread_attr_setschedpolicy(&recv_attr, SCHED_FIFO)) log_error("TRX_ECPRI", "pthread setschedpolicy failed\n"); recv_param.sched_priority = 97; if (pthread_attr_setschedparam(&recv_attr, &recv_param)) log_error("TRX_ECPRI", "pthread setschedparam failed\n"); /* Use scheduling parameters of attr */ if (pthread_attr_setinheritsched(&recv_attr, PTHREAD_EXPLICIT_SCHED)) log_error("TRX_ECPRI", "pthread setinheritsched failed\n"); if (pthread_attr_init(&send_attr)) log_error("TRX_ECPRI", "init pthread attributes failed\n"); if (pthread_attr_setstacksize(&send_attr, PTHREAD_STACK_MIN)) log_error("TRX_ECPRI", "pthread setstacksize failed\n"); if (pthread_attr_setschedpolicy(&send_attr, SCHED_FIFO)) log_error("TRX_ECPRI", "pthread setschedpolicy failed\n"); send_param.sched_priority = 97; if (pthread_attr_setschedparam(&send_attr, &send_param)) log_error("TRX_ECPRI", "pthread setschedparam failed\n"); if (pthread_attr_setinheritsched(&send_attr, PTHREAD_EXPLICIT_SCHED)) log_error("TRX_ECPRI", "pthread setinheritsched failed\n"); if (pthread_attr_init(&encode_attr)) log_error("TRX_ECPRI", "init pthread attributes failed\n"); if (pthread_attr_setstacksize(&encode_attr, PTHREAD_STACK_MIN)) log_error("TRX_ECPRI", "pthread setstacksize failed\n"); if (pthread_attr_setschedpolicy(&encode_attr, SCHED_FIFO)) log_error("TRX_ECPRI", "pthread setschedpolicy failed\n"); encode_param.sched_priority = 97; if (pthread_attr_setschedparam(&encode_attr, &encode_param)) log_error("TRX_ECPRI", "pthread setschedparam failed\n"); if (pthread_attr_setinheritsched(&encode_attr, PTHREAD_EXPLICIT_SCHED)) log_error("TRX_ECPRI", "pthread setinheritsched failed\n"); if (pthread_attr_init(&decode_attr)) log_error("TRX_ECPRI", "init pthread attributes failed\n"); if (pthread_attr_setstacksize(&decode_attr, PTHREAD_STACK_MIN)) log_error("TRX_ECPRI", "pthread setstacksize failed\n"); if (pthread_attr_setschedpolicy(&decode_attr, SCHED_FIFO)) log_error("TRX_ECPRI", "pthread setschedpolicy failed\n"); decode_param.sched_priority = 97; if (pthread_attr_setschedparam(&decode_attr, &decode_param)) log_error("TRX_ECPRI", "pthread setschedparam failed\n"); if (pthread_attr_setinheritsched(&decode_attr, PTHREAD_EXPLICIT_SCHED)) log_error("TRX_ECPRI", "pthread setinheritsched failed\n"); if (pthread_attr_init(&statistic_attr)) log_error("TRX_ECPRI", "init pthread attributes failed\n"); if (pthread_attr_setstacksize(&statistic_attr, PTHREAD_STACK_MIN)) log_error("TRX_ECPRI", "pthread setstacksize failed\n"); if (pthread_attr_setschedpolicy(&statistic_attr, SCHED_FIFO)) log_error("TRX_ECPRI", "pthread setschedpolicy failed\n"); statistic_param.sched_priority = 97; if (pthread_attr_setschedparam(&statistic_attr, &statistic_param)) log_error("TRX_ECPRI", "pthread setschedparam failed\n"); if (pthread_attr_setinheritsched(&statistic_attr, PTHREAD_EXPLICIT_SCHED)) log_error("TRX_ECPRI", "pthread setinheritsched failed\n"); if (pthread_create(&statistic_pthread, NULL, statistic_thread, s)) error(EXIT_FAILURE, errno, "Couldn't create statistic thread"); usleep(1000 * 20); if (pthread_create(&encode_pthread, NULL, encode_thread, s)) error(EXIT_FAILURE, errno, "Couldn't create encode thread"); usleep(1000 * 20); if (pthread_create(&decode_pthread, NULL, decode_thread, s)) error(EXIT_FAILURE, errno, "Couldn't create decode thread"); usleep(1000 * 20); if (pthread_create(&send_pthread, NULL, send_thread, s)) error(EXIT_FAILURE, errno, "Couldn't create send thread"); usleep(1000 * 500); if (pthread_create(&recv_pthread, NULL, recv_thread, s)) error(EXIT_FAILURE, errno, "Couldn't create recv thread"); return 0; } int startdpdk(TRXEcpriState * s) { uint8_t ecpri_message[MAX_PACKET_SIZE]; int argc = 1; int k = 1; int prev_space = -1; char ** argv; #ifndef DPDK for(int i = 0; i < 262; i++) pkt_frame_full[i] = 0xff; #endif for(int i = 0;; i++) { if(s->dpdk_options[i] == ' ') argc++; else if(s->dpdk_options[i] == '\0') break; } argv = (char **) malloc(sizeof(char *) * argc); for(int i = 0;; i++) { if(s->dpdk_options[i] == ' ') { argv[k] = (char *) malloc(i - prev_space); strncpy(argv[k], s->dpdk_options + prev_space + 1, i - prev_space -1); argv[k][i - prev_space-1] = '\0'; prev_space = i; k++; } else if(s->dpdk_options[i] == '\0') { break; } } argv[0] = ""; #ifdef DPDK init_dpdk(argc, argv); #endif log_info("TRX_ECPRI", "Start"); //set_latency_target(); tx_seq_id = 0; init_counter(&rx_drop_counter); init_counter(&tx_drop_counter); init_counter(&recv_counter); init_counter(&decode_counter); init_counter(&read_counter); init_counter(&write_counter); init_counter(&encode_counter); init_counter(&trx_encode_counter); init_counter(&sent_counter); init_counter(&empty_encode_counter); RBUF_INIT(rx_rbuf, "RX ring buffer", s->txrx_buf_size, s->rx_n_channel * 60 + 22 + s->tdd_frame_start + s->timestamp_frames * sizeof(uint64_t), uint8_t); RBUF_INIT(tx_rbuf, "TX ring buffer", s->txrx_buf_size, s->tx_n_channel * 60 + 8 + s->tdd_frame_start + s->timestamp_frames * sizeof(uint64_t), uint8_t); for(int i = 0; i < s->tx_n_channel; i++) { char name[256]; sprintf(name, "TRXWrite Ring Buffer %d", i); RBUF_INIT(trxw_rbuf[i], name, s->trx_buf_size, N_SAMPLES, Complex); } for(int i = 0; i < s->rx_n_channel; i++) { char name[256]; sprintf(name, "TRXRead Ring Buffer %d", i); RBUF_INIT(trxr_rbuf[i], name, s->trx_buf_size, N_SAMPLES, Complex); } RBUF_INIT(trxw_group_rbuf, "TRXGroupWrite ring buffer", TRX_MAX_GROUP, 1, sample_group_t); RBUF_INIT(one_way_rbuf, "One-way ring buffer", 1024, 1, int64_t); memset((uint8_t *) ecpri_message, 0, MAX_PACKET_SIZE); #ifdef DPDK if(sscanf((char *) s->re_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c", &d_addr.addr_bytes[0], &d_addr.addr_bytes[1], &d_addr.addr_bytes[2], &d_addr.addr_bytes[3], &d_addr.addr_bytes[4], &d_addr.addr_bytes[5]) != 6) fprintf(stderr, "Invalid eRE MAC address\n"); if(sscanf((char *) s->rec_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c", &s_addr.addr_bytes[0], &s_addr.addr_bytes[1], &s_addr.addr_bytes[2], &s_addr.addr_bytes[3], &s_addr.addr_bytes[4], &s_addr.addr_bytes[5]) != 6) fprintf(stderr, "Invalid eREC MAC address\n"); #endif /* Standard Header */ ecpri_message[0] = 0x10; // Protocol data revision 0x1, C = 0 // Message type = 0x00, IQ data // Payload size *((uint16_t *) (ecpri_message + 2)) = htons(244); *((uint16_t *) (ecpri_message + 4)) = htons(s->flow_id); for(int i = 0; i < tx_rbuf.buf_len; i++) memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_message, tx_rbuf.len); start_threads(s); return 0; } static void trx_ecpri_end(TRXState *s1) { log_info("TRX_ECPRI", "End"); TRXEcpriState *s = s1->opaque; free(s); } static int64_t prev_ts = 0; static int64_t prev_count = 0; #define M 32 static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md) { int write_count; int64_t ts; sample_group_t * g; int nc; int nk = 0; float ** _samples = (float **) __samples; TRXEcpriState *s = s1->opaque; write_count = count / M; ts = timestamp / M; if(count) min_count_trx = count < min_count_trx ? count : min_count_trx; log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count); if(prev_count && ((ts - prev_ts) != prev_count)) { log_exit("TRX_ECPRI_WRITE", "Gap between timestamps: prev_ts %li ts %li prev_count %li count %li diff_ts %li", prev_ts, ts, prev_count, count, (ts - prev_ts)); } prev_ts = ts; prev_count = write_count; if(write_count > rbuf_write_amount(&trxw_rbuf[0])) { //log_exit("TRX_ECPRI_WRITE", "Not enough space to write in trxw_rbuf (write count = %d)", write_count); update_counter(&tx_drop_counter, write_count); return; } if(s->trace_tx) { if((write_counter.counter + write_count) >= (trxw_rbuf[0].buf_len + s->trace_offset)) { tx_trace_ready = 1; log_info("TRX_ECPRI_WRITE", "TX Trace ready"); pthread_exit(EXIT_SUCCESS); } else if (tx_trace_ready) { pthread_exit(EXIT_SUCCESS); } } if(first_trx_write) { sample_group_t * g2 = RBUF_WRITE0(trxw_group_rbuf, sample_group_t); g2->count = ts; g2->wait = 1; g2->zeroes = 1; rbuf_update_write_index(&trxw_group_rbuf); } g = RBUF_WRITE0(trxw_group_rbuf, sample_group_t); g->zeroes = __samples ? 0 : 1; g->wait = 0; g->count = write_count; while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[0], write_count))) { int len = nc * trxr_rbuf[0].len * sizeof(Complex); if(__samples) for(int i = 0; i < s->tx_n_channel; i++) { uint8_t * src = ((uint8_t*) _samples[i]) + (nk * trxr_rbuf[0].len * sizeof(Complex)); uint8_t * dst = ((uint8_t *) trxw_rbuf[i].buffer) + trxw_rbuf[0].write_index * trxw_rbuf[0].len * sizeof(Complex); memcpy(dst, src, len); } trxw_rbuf[0].write_index = (trxw_rbuf[0].write_index + nc) % trxw_rbuf[0].buf_len; write_count -= nc; nk += nc; } rbuf_update_write_index(&trxw_group_rbuf); update_counter(&write_counter, count / M); } static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md) { int nc; int n; float ** _samples = (float **) __samples; int read_count = (count / M); int offset = 0; TRXEcpriState *s = s1->opaque; log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter); while(rbuf_read_amount(&trxr_rbuf[0]) < read_count); sync_complete = 1; n = read_count; while((nc = rbuf_contiguous_copy(&trxr_rbuf[0], NULL, n))) { int len = nc * trxr_rbuf[0].len * sizeof(Complex); for(int i = 0; i < s->rx_n_channel; i++ ) { uint8_t * dst = (uint8_t*) (_samples[i] + offset); uint8_t * src = ((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * sizeof(Complex); if(!s->disable_rx_copy && !s->trx_read_null) memcpy(dst, src, len); } trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len; n -= nc; offset += len; } *ptimestamp = (read_counter.counter) * M; update_counter(&read_counter, read_count); return count; } /* This function can be used to automatically set the sample rate. Here we don't implement it, so the user has to force a given sample rate with the "sample_rate" configuration option */ static int trx_ecpri_get_sample_rate(TRXState *s1, TRXFraction *psample_rate, int *psample_rate_num, int sample_rate_min) { return -1; } static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params) { TRXEcpriState *s = s1->opaque; log_info("TRX_ECPRI_START", "Start"); log_info("TRX_ECPRI_START", "trx_api_version: %d", s1->trx_api_version); log_info("TRX_ECPRI_START", "config file: %s", s1->path); log_info("TEST-DPDK-ECPRI", "rec-mac: %s, re-mac: %s, rec-if: %s", s->rec_mac, s->re_mac, s->rec_if); s->sample_rate = params->sample_rate[0].num / params->sample_rate[0].den; startdpdk(s); return 0; } void dummy_enb_init(TRXState *s1, TRXEcpriState *s) { s1->trx_write_func2 = trx_ecpri_write; s1->trx_read_func2 = trx_ecpri_read; startdpdk(s); } /* Called to start the tranceiver. Return 0 if OK, < 0 if */ int trx_start_func(TRXState *s, const TRXDriverParams *p) { log_info("DEBUG", "trx_start_func"); return 0; } /* Deprecated, use trx_write_func2 instead. Write 'count' samples on each channel of the TX port 'tx_port_index'. samples[0] is the array for the first channel. timestamp is the time (in samples) at which the first sample must be sent. When the TRX_WRITE_FLAG_PADDING flag is set, samples is set to NULL. It indicates that no data should be sent (TDD receive time). TRX_WRITE_FLAG_END_OF_BURST is set to indicate in advance that the next write call will have the TRX_WRITE_FLAG_PADDING flag set. Note: TRX_WRITE_FLAG_END_OF_BURST and TRX_WRITE_FLAG_PADDING are never set simultaneously. */ void trx_write_func(TRXState *s, trx_timestamp_t timestamp, const void **samples, int count, int flags, int tx_port_index) { log_info("DEBUG", "**"); } /* Deprecated, use trx_read_func2 instead. Read 'count' samples from each channel. samples[0] is the array for the first channel. *ptimestamp is the time at which the first samples was received. Return the number of sample read (=count). Note: It is explicitely allowed that the application calls trx_write_func, trx_read_func, trx_set_tx_gain_func and trx_set_rx_gain_func from different threads. */ int trx_read_func(TRXState *s, trx_timestamp_t *ptimestamp, void **samples, int count, int rx_port_index) { log_info("DEBUG", "**"); return 0; } /* Dynamic set the transmit gain (in dB). The origin and range are driver dependent. Note: this function is only used for user supplied dynamic adjustements. */ void trx_set_tx_gain_func(TRXState *s, double gain, int channel_num) { log_info("DEBUG", "trx_set_tx_gain_func"); } /* Dynamic set the receive gain (in dB). The origin and range are driver dependent. Note: this function is only used for user supplied dynamic adjustements. */ void trx_set_rx_gain_func(TRXState *s, double gain, int channel_num) { log_info("DEBUG", "trx_set_rx_gain_func"); } /* Return the maximum number of samples per TX packet. Called by * the application after trx_start_func. * Optional */ int trx_get_tx_samples_per_packet_func(TRXState *s) { log_info("DEBUG", "trx_get_tx_samples_per_packet_func"); return 0; } /* Return some statistics. Return 0 if OK, < 0 if not available. */ int trx_get_stats(TRXState *s, TRXStatistics *m) { log_info("DEBUG", "trx_get_stats"); return 0; } /* Callback must allocate info buffer that will be displayed */ void trx_dump_info(TRXState *s, trx_printf_cb cb, void *opaque) { log_info("DEBUG", "*opaque"); } /* Return the absolute TX power in dBm for the TX channel 'channel_num' assuming a square signal of maximum amplitude. This function can be called from any thread and needs to be fast. Return 0 if OK, -1 if the result is not available. */ int trx_get_abs_tx_power_func(TRXState *s, float *presult, int channel_num) { log_info("DEBUG", "trx_get_abs_tx_power_func"); return 0; } /* Return the absolute RX power in dBm for the RX channel 'channel_num' assuming a square signal of maximum amplitude. This function can be called from any thread and needs to be fast. Return 0 if OK, -1 if the result is not available. */ int trx_get_abs_rx_power_func(TRXState *s, float *presult, int channel_num) { log_info("DEBUG", "trx_get_abs_rx_power_func"); return 0; } /* Remote API communication * Available since API v14 * trx_msg_recv_func: called for each trx received messages * trx_msg_send_func: call it to send trx messages (They must be registered by client) * For each message, a call to send API must be done */ void trx_msg_recv_func(TRXState *s, TRXMsg *msg) { log_info("DEBUG", "trx_msg_recv_func"); } TRXMsg* trx_msg_send_func(TRXState *s) { log_info("DEBUG", "trx_msg_send_func"); return NULL; } /* Return actual transmit gain (in dB). The origin and range are driver dependent. */ void trx_get_tx_gain_func(TRXState *s, double *gain, int channel_num) { log_info("DEBUG", "trx_get_tx_gain_func"); } /* Returns actual receive gain (in dB). The origin and range are driver dependent. */ void trx_get_rx_gain_func(TRXState *s, double *gain, int channel_num) { log_info("DEBUG", "trx_get_rx_gain_func"); } /* Stop operation of the transceiver - to be called after trx_start. resources allocated in init are not released, so trx_call can be called again */ void trx_stop_func(TRXState *s) { log_info("DEBUG", "trx_stop_func"); } /* OFDM mode: experimental 7.2 API */ /* read the current timestamp (only used in OFDM mode). Return 0 if OK, < 0 if not supported by device. */ int trx_read_timestamp(TRXState *s, trx_timestamp_t *ptimestamp, int port_index) { log_info("DEBUG", "trx_read_timestamp"); return 0; } void trx_set_tx_streams(TRXState *s, int rf_port_index, int n_streams, const TRXOFDMStreamInfo *streams) { log_info("DEBUG", "trx_set_tx_streams"); } void trx_set_rx_streams(TRXState *s, int rf_port_index, int n_streams, const TRXOFDMStreamInfo *streams) { log_info("DEBUG", "trx_set_rx_streams"); } /* schedule the reading of OFDM symbols. Return 0 if OK, < 0 if error. */ int trx_schedule_read(TRXState *s, int rf_port_index, const TRXScheduledSymbol *symbols, int n_symbols) { log_info("DEBUG", "trx_schedule_read"); return 0; } /* AGC functions */ int trx_set_agc_func(TRXState *s, const TRXAGCParams *p, int channel) { log_info("DEBUG", "trx_set_agc_func"); return 0; } int trx_get_agc_func(TRXState *s, TRXAGCParams *p, int channel) { log_info("DEBUG", "trx_get_agc_func"); return 0; } int trx_driver_init(TRXState *s1) { TRXEcpriState *s; double val; // Lock all current and future pages from preventing of being paged to swap if (mlockall(MCL_CURRENT | MCL_FUTURE)) { log_error("TRX_ECPRI", "mlockall failed"); } log_info("TRX_ECPRI", "Init"); if (s1->trx_api_version != TRX_API_VERSION) { fprintf(stderr, "ABI compatibility mismatch between LTEENB and TRX driver (LTEENB ABI version=%d, TRX driver ABI version=%d)\n", s1->trx_api_version, TRX_API_VERSION); return -1; } s = malloc(sizeof(TRXEcpriState)); memset(s, 0, sizeof(*s)); trx_get_param_double(s1, &val, "recv_affinity"); s->recv_affinity = (int) val; trx_get_param_double(s1, &val, "send_affinity"); s->send_affinity = (int) val; trx_get_param_double(s1, &val, "encode_affinity"); s->encode_affinity = (int) val; trx_get_param_double(s1, &val, "decode_affinity"); s->decode_affinity = (int) val; trx_get_param_double(s1, &val, "statistic_affinity"); s->statistic_affinity = (int) val; trx_get_param_double(s1, &val, "flow_id"); s->flow_id = (int) val; trx_get_param_double(s1, &val, "frame_frequency"); s->frame_frequency = (int) val; trx_get_param_double(s1, &val, "trx_buf_size"); s->trx_buf_size = (int) val; trx_get_param_double(s1, &val, "txrx_buf_size"); s->txrx_buf_size = (int) val; trx_get_param_double(s1, &val, "trace_rx"); s->trace_rx = (int) val; trx_get_param_double(s1, &val, "trace_tx"); s->trace_tx = (int) val; trx_get_param_double(s1, &val, "trace_offset"); s->trace_offset = (int) val; trx_get_param_double(s1, &val, "monitor_pps"); s->monitor_pps = (int) val; trx_get_param_double(s1, &val, "monitor_trigger_duration"); s->monitor_trigger_duration = (int) val; trx_get_param_double(s1, &val, "start_receiving"); s->start_receiving = (int) val; trx_get_param_double(s1, &val, "rx_n_channel"); s->rx_n_channel = (int) val; trx_get_param_double(s1, &val, "tx_n_channel"); s->tx_n_channel = (int) val; trx_get_param_double(s1, &val, "master"); s->master = (int) val; trx_get_param_double(s1, &val, "one_way_measure"); s->one_way_measure = (int) val; trx_get_param_double(s1, &val, "one_way_period"); s->one_way_period = (int) val; trx_get_param_double(s1, &val, "tdd_frame_start"); s->tdd_frame_start = (int) val; trx_get_param_double(s1, &val, "timestamp_frames"); s->timestamp_frames = (int) val; trx_get_param_double(s1, &val, "master_timestamp_check_period"); s->master_timestamp_check_period = (int) val; trx_get_param_double(s1, &val, "slave_timestamp_check_period"); s->slave_timestamp_check_period = (int) val; trx_get_param_double(s1, &val, "trx_read_null"); s->trx_read_null = (int) val; trx_get_param_double(s1, &val, "tdd_period"); s->tdd_period = (int) val; trx_get_param_double(s1, &val, "encode_burst"); s->encode_burst = (int) val; trx_get_param_double(s1, &val, "send_burst"); s->send_burst = (int) val; trx_get_param_double(s1, &val, "statistics_refresh_rate_ns"); s->statistics_refresh_rate_ns = (int) val; trx_get_param_double(s1, &val, "ecpri_period"); s->ecpri_period = (int) val; trx_get_param_double(s1, &val, "disable_rx_copy"); s->disable_rx_copy = (int) val; if(s->ecpri_period == 0) log_exit("TRX_ECPRI", "ecpri_period parameter can't be null\n"); if(s->rx_n_channel == 0) log_exit("TRX_ECPRI", "rx_n_channel parameter can't be null\n"); if(s->tx_n_channel == 0) log_exit("TRX_ECPRI", "tx_n_channel parameter can't be null\n"); s->re_mac = (uint8_t *) trx_get_param_string(s1, "re_mac"); s->rec_mac = (uint8_t *) trx_get_param_string(s1, "rec_mac"); s->rec_if = (uint8_t *) trx_get_param_string(s1, "rec_if"); s->dpdk_options = trx_get_param_string(s1, "dpdk_options"); s->log_directory = (uint8_t *) trx_get_param_string(s1, "log_directory"); recv_pps_threshold = (s->frame_frequency * 9 / 10); s1->opaque = s; s1->trx_end_func = trx_ecpri_end; s1->trx_write_func2 = trx_ecpri_write; s1->trx_read_func2 = trx_ecpri_read; s1->trx_start_func = trx_ecpri_start; s1->trx_get_sample_rate_func = trx_ecpri_get_sample_rate; //s1->trx_write_func = trx_write_func; //s1->trx_read_func = trx_read_func; //s1->trx_set_tx_gain_func = trx_set_tx_gain_func; //s1->trx_set_rx_gain_func = trx_set_rx_gain_func; //s1->trx_get_tx_samples_per_packet_func = trx_get_tx_samples_per_packet_func; //s1->trx_get_stats = trx_get_stats; //s1->trx_dump_info = trx_dump_info; //s1->trx_get_abs_tx_power_func = trx_get_abs_tx_power_func; //s1->trx_get_abs_rx_power_func = trx_get_abs_rx_power_func; //s1->trx_msg_recv_func = trx_msg_recv_func; //s1->trx_get_tx_gain_func = trx_get_tx_gain_func; //s1->trx_get_rx_gain_func = trx_get_rx_gain_func; //s1->trx_stop_func = trx_stop_func; //s1->trx_read_timestamp = trx_read_timestamp; //s1->trx_set_tx_streams = trx_set_tx_streams; //s1->trx_set_rx_streams = trx_set_rx_streams; //s1->trx_schedule_read = trx_schedule_read; //s1->trx_set_agc_func = trx_set_agc_func; //s1->trx_get_agc_func = trx_get_agc_func; //s1->trx_msg_send_func = trx_msg_send_func; return 0; }