Commit 370ef0bc authored by Joanne Hugé's avatar Joanne Hugé

wip

parent 4e72db49
//#define TEST
#ifdef TEST
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#endif
typedef struct {
volatile char * buffer;
volatile size_t * block_buffer;
char name[64];
size_t len;
size_t block_buf_len;
size_t min_block_size;
volatile int write_index;
volatile int read_index;
volatile int block_write_index;
volatile int block_read_index;
} ring_buffer_t;
static void init_rbuf(ring_buffer_t * rbuf, char * name, size_t len, size_t min_block_size) {
//log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, len);
rbuf->len = len;
rbuf->block_buf_len = len / min_block_size;
rbuf->buffer = (char *) malloc(len);
rbuf->block_buffer = (size_t *) malloc(rbuf->block_buf_len * sizeof(size_t));
rbuf->min_block_size = min_block_size;
rbuf->write_index = 0;
rbuf->read_index = 0;
rbuf->block_write_index = 0;
rbuf->block_read_index = 0;
strcpy(rbuf->name, name);
}
static int rbuf_read_amount(ring_buffer_t * rbuf) {
return (rbuf->write_index + rbuf->len - rbuf->read_index) % rbuf->len;
}
static int rbuf_write_amount(ring_buffer_t * rbuf) {
// Don't write everything to avoid write index catching up to read index
// That we way we don't have to use locks
return ((rbuf->read_index + rbuf->len - rbuf->write_index - 1) % rbuf->len);
}
static volatile char * rbuf_write(ring_buffer_t * rbuf) {
return rbuf->buffer + rbuf->write_index;
}
static void rbuf_increment_write(ring_buffer_t * rbuf, size_t size) {
rbuf->write_index = (rbuf->write_index + size) % rbuf->len;
rbuf->block_buffer[rbuf->block_write_index] = size;
rbuf->block_write_index = (rbuf->block_write_index + 1) % rbuf->block_buf_len;
}
static volatile char * rbuf_read(ring_buffer_t * rbuf, size_t * size) {
volatile char * data = rbuf->buffer + rbuf->read_index;
*size = rbuf->block_buffer[rbuf->block_read_index];
rbuf->read_index = (rbuf->read_index + *size) % rbuf->len;
rbuf->block_read_index = (rbuf->block_read_index + 1) % rbuf->block_buf_len;
return data;
}
#ifdef TEST
static ring_buffer_t test_rbuf;
int main(int argc, char ** argv) {
char volatile * data;
size_t size;
int i, j;
init_rbuf(&test_rbuf, "TEST", 64, 4);
for(j = 0; j < 10; j++) {
data = rbuf_write(&test_rbuf);
strcpy(data, "hello");
rbuf_increment_write(&test_rbuf, 6);
data = rbuf_write(&test_rbuf);
strcpy(data, "world");
rbuf_increment_write(&test_rbuf, 6);
data = rbuf_read(&test_rbuf, &size);
for(i = 0; i < size; i++) {
printf("%c", data[i]);
}
printf("\n");
data = rbuf_read(&test_rbuf, &size);
for(i = 0; i < size; i++) {
printf("%c", data[i]);
}
printf("\n");
}
return 0;
}
#endif
#ifdef ARCHIVE
static void rbuf_update_write_index(ring_buffer_t * rbuf) {
rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
}
static void rbuf_update_read_index(ring_buffer_t * rbuf) {
rbuf->read_index = (rbuf->read_index + 1) % rbuf->buf_len;
}
static int rbuf_read_amount(ring_buffer_t * rbuf) {
return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len;
}
static int rbuf_write_amount(ring_buffer_t * rbuf) {
// Don't write everything to avoid write index catching up to read index
// That we way we don't have to use locks
return ((rbuf->read_index + rbuf->buf_len - rbuf->write_index - 1) % rbuf->buf_len);
}
static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, int n) {
int ret = n;
if(rbuf1) {
n = rbuf1->buf_len - rbuf1->read_index;
ret = n < ret ? n : ret;
}
if(rbuf2)
n = rbuf2->buf_len - rbuf2->write_index;
return n < ret ? n : ret;
}
#define RBUF_READ0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len))
#define RBUF_WRITE0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len))
#define RBUF_READ(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.read_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_WRITE(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.write_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\
{\
log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (_buf_len * _len * sizeof(type)));\
rbuf.buffer = (type *) malloc(_buf_len * _len * sizeof(type));\
strcpy(rbuf.name, _name);\
rbuf.buf_len = _buf_len;\
rbuf.len = _len;\
rbuf.write_index = 0;\
rbuf.read_index = 0;\
} while(0)
#endif
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "private/trx_driver.h"
#define DEBUG
#define SSE4 /* define if CPU supports SSE4.1 */
#include "utils.c"
#include "ring_buffer.c"
/* eCPRI Send and Recv */
#define PACKET_SIZE 262
#define RX_N_CHANNEL 1
#define TX_N_CHANNEL 4
#define FRAME_FREQ INT64_C(3840000) // Basic frame frequency
#define TX_PACKET_SIZE 262
#define RX_MAX_PACKET_SIZE 262
#define TX_ECPRI_PACKET_SIZE (TX_PACKET_SIZE - 14)
#define N_SAMPLES (32)
#define SEND_LIMIT 1250
#define TRX_WB_MAX_PARTS 1000
#define TRX_BUF_MAX_SIZE 1000
/* eCPRI frame structure
Ethernet header: 18 bytes
6 6 4 2
+---------+---------+-----+-----------+
| MAC dst | MAC src | Tag | Ethertype |
+---------+---------+-----+-----------+
eCPRI common header: 1 byte
4b 3b 1b
+---------+----------+--------+
| Version | Reserved | Concat |
+---------+----------+--------+
eCPRI common header: 8 bytes
4b 3b 1b 1 2
+---------+----------+--------+---------+---------+
| Version | Reserved | Concat | Message | Payload |
| | | | Type | Size |
+---------+----------+--------+---------+---------+
Ethernet + eCPRI header: 26 bytes
eCPRI concatenated frames
*/
typedef struct {
float re;
float im;
} Complex;
typedef struct {
const char * re_mac;
const char * rec_mac;
const char * rec_if;
int recv_affinity;
int send_affinity;
int prepare_affinity;
int decompress_affinity;
int ecpri_period;
int flow_id;
int sample_rate;
} TRXEcpriState;
typedef struct {
int64_t counter;
int64_t pps_counter;
int64_t pps_ts;
int64_t pps;
} counter_stat_t;
/* Proprietary code:
- compression / decompression of IQ samples
- fast conversion between int16_t and float
*/
//#include "private/bf1_avx2.c"
#include "private/bf1_sse4.c"
// Buffers
/* rx_rbuf stores ethernet frames */
static ring_buffer_t rx_rbuf; // Received packets
static ring_buffer_t trxr_rbuf[RX_N_CHANNEL]; // Decoded IQ samples
static ring_buffer_t tx_rbuf; // Packets to send
static ring_buffer_t trxw_rbuf[TX_N_CHANNEL]; // Uncompressed IQ samples
static ring_buffer_t trxw_group_rbuf; // Group of IQ samples
// Counters
static volatile counter_stat_t recv_counter; // frames received from eRE
static volatile counter_stat_t decode_counter; // decoded frames
static volatile counter_stat_t read_counter; // frames passed to amarisoft stack
static volatile counter_stat_t write_counter; // samples to write from TRX
static volatile counter_stat_t encode_counter; // encoded frames
static volatile counter_stat_t sent_counter; // frames sent to eRE
static volatile counter_stat_t rx_drop_counter; // frames sent to eRE
static volatile counter_stat_t tx_drop_counter; // frames sent to eRE
// OLD
//static ring_buffer_t rx_rbuf;
//static ring_buffer_t trx_read_rbuf;
//static ring_buffer_t tx_rbuf;
//static ring_buffer_t trx_write_rbuf;
//static volatile int trx_wb_part[TRX_WB_MAX_PARTS]; // TODO write next index instead of current
//static volatile int64_t trx_wb_ts[TRX_WB_MAX_PARTS];
//static int trx_wb_part_read_index;
//static int trx_wb_part_write_index;
//// Locks
//pthread_mutex_t tx_mutex;
//pthread_cond_t tx_cond;
//pthread_mutex_t rx_mutex;
//pthread_cond_t rx_cond;
//pthread_mutex_t tx_ready_mutex;
//pthread_cond_t tx_ready_cond;
//sem_t trx_read_sem;
//// Counters
//static volatile int64_t prepared_frame_count;
//static volatile int64_t read_frame_count;
//static volatile int64_t sent_frame_count;
//// Computed values
//static int rxtx_buf_size;
//static int ecpri_period_mult;
// OLD
// Network
static volatile int seq_id;
static int send_sockfd;
static int recv_sockfd;
static struct sockaddr_ll connect_sk_addr;
static void print_stats(FILE * f, int print_header) {
if(print_header) {
fprintf(f,
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"%" STAT_INT_LEN "s "
"\n",
"rx dropped",
"tx dropped",
"received",
"decode",
"read",
"write",
"encode",
"sent",
"received pps",
"decode pps",
"read pps",
"write pps",
"encode pps",
"sent pps");
}
fprintf(f,
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 " "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"%" STAT_INT_LEN "" PRIi64 "pps "
"\n",
rx_drop_counter.counter,
tx_drop_counter.counter,
recv_counter.counter,
decode_counter.counter,
read_counter.counter,
write_counter.counter,
encode_counter.counter,
sent_counter.counter,
recv_counter.pps,
decode_counter.pps,
read_counter.pps,
write_counter.pps,
encode_counter.pps,
sent_counter.pps);
}
static void log_exit(const char * section, const char * msg, ...) {
time_t t;
struct tm ts;
char line[256];
va_list arglist;
time(&t);
ts = *localtime(&t);
strftime(line, 80, "%m-%d %H:%M:%S", &ts);
sprintf(line + strlen(line), " EXIT [%s] ", section);
va_start(arglist, msg);
vsprintf(line + strlen(line), msg, arglist);
va_end(arglist);
fprintf(stderr, "%s\n", line);
// Dump useful information
print_stats(stderr, 1);
fprintf(stderr, "TX RBUF: ri %d wi %d ra %d wa %d\n", tx_rbuf.read_index, tx_rbuf.write_index, rbuf_read_amount(&tx_rbuf), rbuf_write_amount(&tx_rbuf));
fprintf(stderr, "RX RBUF: ri %d wi %d ra %d wa %d\n", rx_rbuf.read_index, rx_rbuf.write_index, rbuf_read_amount(&rx_rbuf), rbuf_write_amount(&rx_rbuf));
fprintf(stderr, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
fprintf(stderr, "TRXR RBUF: ri %d wi %d ra %d wa %d\n", trxr_rbuf[0].read_index, trxr_rbuf[0].write_index, rbuf_read_amount(&trxr_rbuf[0]), rbuf_write_amount(&trxr_rbuf[0]));
fprintf(stderr, "TRXW GROUP RBUF: ri %d wi %d ra %d wa %d\n", trxw_group_rbuf.read_index, trxw_group_rbuf.write_index, rbuf_read_amount(&trxw_group_rbuf), rbuf_write_amount(&trxw_group_rbuf));
fflush(stdout);
fflush(stderr);
exit(EXIT_FAILURE);
}
static void init_counter(volatile counter_stat_t * c) {
c->counter = 0;
c->pps_counter = 0;
c->pps_ts = 0;
c->pps = 0;
}
static void update_counter_pps(volatile counter_stat_t * c) {
struct timespec _ts;
int64_t ts;
clock_gettime(CLOCK_TAI, &_ts);
ts = ts_to_int(_ts);
if((ts - c->pps_ts) > PPS_UPDATE_PERIOD) {
if(c->pps_ts)
c->pps = ((c->counter - c->pps_counter) * NSEC_PER_SEC) / (ts - c->pps_ts);
c->pps_counter = c->counter;
c->pps_ts = ts;
}
}
static void update_counter(volatile counter_stat_t * c, int64_t v) {
c->counter += v;
}
static void *recv_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int ret;
log_info("RECV_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->recv_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity);
for(;;) {
struct mmsghdr msgh[4000];
struct iovec msgv[4000];
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < ecpri_period_mult; j++) {
msgv[j].iov_base = RBUF_WRITE(rx_rbuf, uint8_t);
msgv[j].iov_len = rx_rbuf.len;
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
rbuf_update_write_index(&rx_rbuf);
}
ret = recvmmsg(recv_sockfd, msgh, ecpri_period_mult, 0, NULL);
if(ret == -1)
error(EXIT_FAILURE, errno, "recvmmsg error");
if(ret != ecpri_period_mult)
log_error("RECV_THREAD", "recvmmsg received %d messages instead of %d\n", ret, ecpri_period_mult);
}
pthread_exit(EXIT_SUCCESS);
}
static void *send_thread(void *p) {
cpu_set_t mask;
struct timespec initial, next;
struct timespec t1[4000];
struct timespec t2[4000];
int k = 0;
TRXEcpriState * s = (TRXEcpriState *) p;
struct mmsghdr msgh[4000];
struct iovec msgv[4000];
log_info("SEND_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->send_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity);
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < ecpri_period_mult; j++) {
msgh[j].msg_hdr.msg_name = &connect_sk_addr;
msgh[j].msg_hdr.msg_namelen = sizeof(connect_sk_addr);
msgh[j].msg_hdr.msg_iov = &msgv[j];
msgh[j].msg_hdr.msg_iovlen = 1;
}
pthread_mutex_lock(&tx_ready_mutex);
pthread_cond_wait(&tx_ready_cond, &tx_ready_mutex);
pthread_mutex_unlock(&tx_ready_mutex);
clock_gettime(CLOCK_TAI, &initial);
for(int64_t i = 1;; i++) {
int ret, msg_sent;
#ifdef DEBUG
if(i > SEND_LIMIT) {
int64_t d, dt;
clock_gettime(CLOCK_TAI, &next);
d = calcdiff_ns(next, initial);
for(int j = 0; j < k; j++) {
dt = calcdiff_ns(t2[j], t1[j]);
log_debug("SEND_THREAD", "%" PRIi64, dt);
}
log_debug("SEND_THREAD", "Packets sent: %" PRIi64, sent_frame_count);
log_debug("SEND_THREAD", "Duration: %" PRIi64, d);
log_debug("SEND_THREAD", "ecpri_period_mult: %" PRIi64, ecpri_period_mult);
log_debug("SEND_THREAD", "FRAME_FREQ: %" PRIi64, FRAME_FREQ);
exit(EXIT_SUCCESS);
}
#endif
next = initial;
// Multiply by i everytime to prevent any frequence drift
add_ns(&next, (ecpri_period_mult * NSEC_PER_SEC * i) / FRAME_FREQ);
for(int j = 0; j < ecpri_period_mult; j++) {
msgv[j].iov_base = RBUF_READ(tx_rbuf, uint8_t);
msgv[j].iov_len = tx_rbuf.len;
rbuf_update_read_index(&tx_rbuf);
}
for(msg_sent = 0; msg_sent < ecpri_period_mult;) {
#ifdef DEBUG
clock_gettime(CLOCK_TAI, &t1[k]);
#endif
ret = sendmmsg(send_sockfd, msgh + msg_sent, (ecpri_period_mult - msg_sent), 0);
#ifdef DEBUG
clock_gettime(CLOCK_TAI, &t2[k++]);
#endif
if(ret <= 0)
error(EXIT_FAILURE, errno, "sendmmsg error (returned %d)", ret);
msg_sent += ret;
sent_frame_count += ret;
}
pthread_mutex_lock(&tx_mutex);
pthread_cond_signal(&tx_cond);
pthread_mutex_unlock(&tx_mutex);
clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
}
pthread_exit(EXIT_SUCCESS);
}
static void *prepare_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int tx_ready_buffer_full = 0;
log_info("PREPARE_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->prepare_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->prepare_affinity);
for(int64_t i = 0;; i++) {
int16_t samples_int[256];
// If we have frames to prepare
int n = rbuf_write_amount(&tx_rbuf);
if((i == 0) || n) {
// If there are frames from trx_write callback to prepare
if(rbuf_read_amount(&trx_write_rbuf)) {
int64_t ts = trx_wb_ts[trx_wb_part_read_index];
int empty_frames_ahead = ts - prepared_frame_count;
empty_frames_ahead = empty_frames_ahead < n ? empty_frames_ahead : n;
if(empty_frames_ahead > 0) {
for(int j = 0; j < empty_frames_ahead; j++) {
*((uint16_t *) (RBUF_WRITE(tx_rbuf, uint8_t) + 20)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
prepared_frame_count++;
}
}
else if (empty_frames_ahead == 0) {
int m = trx_wb_part[(trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS] - trx_write_rbuf.read_index;
m = m < n ? m : n;
for(int j = 0; j < m; j++) {
float * const trx_samples = RBUF_READ(trx_write_rbuf, float);
uint8_t * const tx_frame = RBUF_WRITE(tx_rbuf, uint8_t);
memset(samples_int, 0, 512);
float_to_int16(samples_int, trx_samples, 256, 32767);
encode_bf1(tx_frame + 22 , samples_int);
encode_bf1(tx_frame + 22 + 60 , samples_int + 64);
encode_bf1(tx_frame + 22 + 120, samples_int + 128);
encode_bf1(tx_frame + 22 + 180, samples_int + 192);
*((uint16_t *)(tx_frame + 20)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
rbuf_update_read_index(&trx_write_rbuf);
prepared_frame_count++;
}
if(m == 0)
trx_wb_part_read_index = (trx_wb_part_read_index + 1) % TRX_WB_MAX_PARTS;
}
else {
log_error("PREPARE_THREAD", "missed trx_write timestamp");
}
}
else {
*((uint16_t *) (RBUF_WRITE(tx_rbuf, uint8_t) + 20)) = htons(seq_id++);
rbuf_update_write_index(&tx_rbuf);
prepared_frame_count++;
}
}
else {
if (!tx_ready_buffer_full) {
tx_ready_buffer_full = 1;
pthread_mutex_lock(&tx_ready_mutex);
pthread_cond_signal(&tx_ready_cond);
pthread_mutex_unlock(&tx_ready_mutex);
}
pthread_mutex_lock(&tx_mutex);
pthread_cond_wait(&tx_cond, &tx_mutex);
pthread_mutex_unlock(&tx_mutex);
}
}
pthread_exit(EXIT_SUCCESS);
}
static void *decompress_thread(void *p) {
cpu_set_t mask;
TRXEcpriState * s = (TRXEcpriState *) p;
int rx_ready = 0;
const float mult = 1. / 32767.;
log_info("DECOMPRESS_THREAD", "Thread init");
// Set thread CPU affinity
CPU_ZERO(&mask);
CPU_SET(s->decompress_affinity, &mask);
if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decompress_affinity);
for(;;) {
int n = rbuf_read_amount(&rx_rbuf);
if(n) {
for(int j = 0; j < n; j++) {
int16_t samples_int[256];
const uint8_t * rx_samples = RBUF_READ(rx_rbuf, uint8_t) + 22;
// TODO : analyze seq_id, ecpri packet type etc... ?
// TODO : set rx_ready at some point (when ?)
if(rx_ready) {
memset(samples_int, 0, 512);
decode_bf1(samples_int , rx_samples , 16);
decode_bf1(samples_int + 64 , rx_samples + 60, 16);
decode_bf1(samples_int + 128, rx_samples + 120, 16);
decode_bf1(samples_int + 192, rx_samples + 180, 16);
int16_to_float(RBUF_WRITE(trx_read_rbuf, float), samples_int, 256, mult);
rbuf_update_read_index(&rx_rbuf);
rbuf_update_write_index(&trx_read_rbuf);
sem_post(&trx_read_sem);
}
}
}
else {
pthread_mutex_lock(&rx_mutex);
pthread_cond_wait(&rx_cond, &rx_mutex);
pthread_mutex_unlock(&rx_mutex);
}
}
pthread_exit(EXIT_SUCCESS);
}
static int start_threads(TRXEcpriState * s) {
pthread_t recv_pthread;
pthread_t send_pthread;
pthread_t prepare_pthread;
pthread_t decompress_pthread;
struct sched_param recv_param;
struct sched_param send_param;
struct sched_param prepare_param;
struct sched_param decompress_param;
pthread_attr_t recv_attr;
pthread_attr_t send_attr;
pthread_attr_t prepare_attr;
pthread_attr_t decompress_attr;
log_info("TRX_ECPRI", "Starting threads");
// Initialize pthread attributes (default values)
if (pthread_attr_init(&recv_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
// Set a specific stack size
if (pthread_attr_setstacksize(&recv_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
// Set scheduler policy and priority of pthread
if (pthread_attr_setschedpolicy(&recv_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
recv_param.sched_priority = 97;
if (pthread_attr_setschedparam(&recv_attr, &recv_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
/* Use scheduling parameters of attr */
if (pthread_attr_setinheritsched(&recv_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&send_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&send_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&send_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
send_param.sched_priority = 97;
if (pthread_attr_setschedparam(&send_attr, &send_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&send_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&prepare_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&prepare_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&prepare_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
prepare_param.sched_priority = 97;
if (pthread_attr_setschedparam(&prepare_attr, &prepare_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&prepare_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_attr_init(&decompress_attr))
log_error("TRX_ECPRI", "init pthread attributes failed\n");
if (pthread_attr_setstacksize(&decompress_attr, PTHREAD_STACK_MIN))
log_error("TRX_ECPRI", "pthread setstacksize failed\n");
if (pthread_attr_setschedpolicy(&decompress_attr, SCHED_FIFO))
log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
decompress_param.sched_priority = 97;
if (pthread_attr_setschedparam(&decompress_attr, &decompress_param))
log_error("TRX_ECPRI", "pthread setschedparam failed\n");
if (pthread_attr_setinheritsched(&decompress_attr, PTHREAD_EXPLICIT_SCHED))
log_error("TRX_ECPRI", "pthread setinheritsched failed\n");
if (pthread_create(&recv_pthread, NULL, recv_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create recv thread");
if (pthread_create(&send_pthread, NULL, send_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create send thread");
if (pthread_create(&prepare_pthread, NULL, prepare_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create prepare thread");
if (pthread_create(&decompress_pthread, NULL, decompress_thread, s))
error(EXIT_FAILURE, errno, "Couldn't create decompress thread");
return 0;
}
int start(TRXEcpriState * s) {
uint8_t dst_mac[6];
uint8_t src_mac[6];
uint8_t ecpri_packet[PACKET_SIZE];
struct ether_header *eh = (struct ether_header *) ecpri_packet;
int if_index;
log_debug("TRX_ECPRI", "raw socket setup");
//set_latency_target();
seq_id = 0;
init_counter(&rx_drop_counter);
init_counter(&tx_drop_counter);
init_counter(&recv_counter);
init_counter(&decode_counter);
init_counter(&read_counter);
init_counter(&write_counter);
init_counter(&encode_counter);
init_counter(&sent_counter);
RBUF_INIT(rx_rbuf, "RX ring buffer", TXRX_BUF_MAX_SIZE, RX_MAX_PACKET_SIZE, uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", TXRX_BUF_MAX_SIZE, TX_ECPRI_PACKET_SIZE, uint8_t);
for(int i = 0; i < TX_N_CHANNEL; i++) {
char s[256];
sprintf(s, "TRXWrite Ring Buffer %d", i);
RBUF_INIT(trxw_rbuf[i], s, TRX_BUF_MAX_SIZE, N_SAMPLES, Complex);
}
for(int i = 0; i < RX_N_CHANNEL; i++) {
char s[256];
sprintf(s, "TRXRead Ring Buffer %d", i);
RBUF_INIT(trxr_rbuf[i], s, TRX_BUF_MAX_SIZE, N_SAMPLES, Complex);
}
RBUF_INIT(trxw_group_rbuf, "TRXGroupWrite ring buffer", TRX_MAX_GROUP, 1, sample_group_t);
trx_wb_part_read_index = 0;
trx_wb_part_write_index = 0;
pthread_mutex_init(&tx_mutex, NULL);
pthread_mutex_init(&rx_mutex, NULL);
pthread_mutex_init(&tx_ready_mutex, NULL);
pthread_cond_init(&tx_cond, NULL);
pthread_cond_init(&rx_cond, NULL);
pthread_cond_init(&tx_ready_cond, NULL);
sem_init(&trx_read_sem, 0, 0);
memset((uint8_t *) ecpri_packet, 0, PACKET_SIZE);
if (!(if_index = if_nametoindex(s->rec_if))) {
perror("if_nametoindex");
return 1;
}
if(sscanf(s->re_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c", &dst_mac[0], &dst_mac[1], &dst_mac[2], &dst_mac[3], &dst_mac[4], &dst_mac[5]) != 6)
fprintf(stderr, "Invalid eRE MAC address\n");
if(sscanf(s->rec_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c", &src_mac[0], &src_mac[1], &src_mac[2], &src_mac[3], &src_mac[4], &src_mac[5]) != 6)
fprintf(stderr, "Invalid eREC MAC address\n");
if ((send_sockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL))) == -1) {
perror("Socket Error");
return 1;
}
if ((recv_sockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL))) == -1) {
perror("Socket Error");
return 1;
}
connect_sk_addr.sll_ifindex = if_index;
connect_sk_addr.sll_halen = ETH_ALEN;
for(int i = 0; i < 6; i++)
connect_sk_addr.sll_addr[i] = dst_mac[i];
log_debug("TRX_ECPRI", "bind");
for(int i = 0; i < 6; i++)
eh->ether_shost[i] = src_mac[i];
for(int i = 0; i < 6; i++)
eh->ether_dhost[i] = dst_mac[i];
/* Ethertype field */
eh->ether_type = htons(0xaefe);
/* Standard Header */
ecpri_packet[14] = 0x10; // Protocol data revision 0x1, C = 0
// Message type = 0x00, IQ data
// Payload size
*((uint16_t *) (ecpri_packet + 16)) = htons(244);
*((uint16_t *) (ecpri_packet + 18)) = htons(s->flow_id);
for(int i = 0; i < rxtx_buf_size; i++) {
//log_debug("TRX_ECPRI", "%d / %d - %d\n", i, rxtx_buf_size, tx_rbuf.len);
memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_packet, tx_rbuf.len);
}
start_threads(s);
return 0;
}
static void trx_ecpri_end(TRXState *s1)
{
log_info("TRX_ECPRI", "End");
TRXEcpriState *s = s1->opaque;
free(s);
}
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{
(void) s1;
float ** _samples = (float **) __samples;
int write_count = count >> 5;
int64_t ts = timestamp >> 5;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index;
trx_wb_ts[trx_wb_part_write_index] = ts;
for(int k = 0; k < write_count; k++) {
for(int i = 0; i < 4; i++)
for(int j = 0; j < 64; j++)
RBUF_WRITE(trx_write_rbuf, float)[i * 64 + j] = _samples[i][j + (k << 6)];
rbuf_update_write_index(&trx_write_rbuf);
}
trx_wb_part_write_index = (trx_wb_part_write_index + 1) % TRX_WB_MAX_PARTS;
trx_wb_part[trx_wb_part_write_index] = trx_write_rbuf.write_index + write_count;
}
static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md)
{
(void) s1;
float ** _samples = (float **) __samples;
int read_count = (count / M);
for(int k = 0; k < read_count; k++) {
float * trx_samples;
sem_wait(&trx_read_sem);
trx_samples = RBUF_READ(trx_read_rbuf, float);
for(int i = 0; i < 4; i++)
for(int j = 0; j < 64; j++)
_samples[i][j + (k << 6)] = trx_samples[i * 64 + j];
rbuf_update_read_index(&trx_read_rbuf);
}
*ptimestamp = recv_counter.counter * M;
update_counter(&read_counter, read_count);
return count;
}
/* This function can be used to automatically set the sample
rate. Here we don't implement it, so the user has to force a given
sample rate with the "sample_rate" configuration option */
static int trx_ecpri_get_sample_rate(TRXState *s1, TRXFraction *psample_rate,
int *psample_rate_num, int sample_rate_min)
{
return -1;
}
static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params)
{
TRXEcpriState *s = s1->opaque;
log_info("TRX_ECPRI_START", "Start");
log_info("TRX_ECPRI_START", "trx_api_version: %d", s1->trx_api_version);
log_info("TRX_ECPRI_START", "config file: %s", s1->path);
log_info("TEST-DPDK-ECPRI", "rec-mac: %s, re-mac: %s, rec-if: %s", s->rec_mac, s->re_mac, s->rec_if);
s->sample_rate = params->sample_rate[0].num / params->sample_rate[0].den;
start(s);
return 0;
}
void dummy_enb_init(TRXState *s1, TRXEcpriState *s) {
s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read;
start(s);
}
int trx_driver_init(TRXState *s1)
{
TRXEcpriState *s;
double val;
// Lock all current and future pages from preventing of being paged to
// swap
if (mlockall(MCL_CURRENT | MCL_FUTURE)) {
log_error("TRX_ECPRI", "mlockall failed");
}
log_info("TRX_ECPRI", "Init");
if (s1->trx_api_version != TRX_API_VERSION) {
fprintf(stderr, "ABI compatibility mismatch between LTEENB and TRX driver (LTEENB ABI version=%d, TRX driver ABI version=%d)\n",
s1->trx_api_version, TRX_API_VERSION);
return -1;
}
s = malloc(sizeof(TRXEcpriState));
memset(s, 0, sizeof(*s));
trx_get_param_double(s1, &val, "recv_affinity");
s->recv_affinity = (int) val;
trx_get_param_double(s1, &val, "send_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "prepare_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "decompress_affinity");
s->send_affinity = (int) val;
trx_get_param_double(s1, &val, "flow_id");
s->flow_id = (int) val;
trx_get_param_double(s1, &val, "ecpri_period");
if(((int) val) == 0) {
fprintf(stderr, "ecpri_period parameter can't be null\n");
return -1;
}
s->ecpri_period = (int) val;
s->re_mac = trx_get_param_string(s1, "re_mac");
s->rec_mac = trx_get_param_string(s1, "rec_mac");
s->rec_if = trx_get_param_string(s1, "rec_if");
s->log_directory = (uint8_t *) trx_get_param_string(s1, "log_directory");
s1->opaque = s;
s1->trx_end_func = trx_ecpri_end;
s1->trx_write_func2 = trx_ecpri_write;
s1->trx_read_func2 = trx_ecpri_read;
s1->trx_start_func = trx_ecpri_start;
s1->trx_get_sample_rate_func = trx_ecpri_get_sample_rate;
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment