#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>

#define DPDK

#ifdef DPDK
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_ether.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include <rte_mbuf.h>
#include <rte_ether.h>
#include <rte_ip.h>
#include <rte_udp.h>
#endif

#include "private/trx_driver.h"
#include "utils.c"

//#define DEBUG // Enables / deactivates log_debug
//#define STAT_SHOW_COUNTERS
#define PPS_UPDATE_PERIOD INT64_C(1500000000)
#define MAX_PACKET_SIZE 1024
#define N_SAMPLES (32)
#define TRX_MAX_GROUP 1500
#define STAT_INT_LEN "8"
//#define DST_ADDR_SYNTAX // Depends on DPDK version

typedef struct {
  float re;
  float im;
} Complex;

typedef struct {
// Log
  const uint8_t * log_directory;
// Network
  const uint8_t * re_mac;
  const uint8_t * rec_mac;
  const uint8_t * rec_if;
  const char * dpdk_options;
// RF
  int rx_n_channel;
  int tx_n_channel;
  int frame_frequency;
  int tdd_period;
  int sample_rate;
// Perfomance / RT
  int recv_affinity;
  int send_affinity;
  int encode_affinity;
  int decode_affinity;
  int statistic_affinity;
  int ecpri_period;
  int trx_buf_size;
  int txrx_buf_size;
  int encode_burst;
  int send_burst;
  int statistics_refresh_rate_ns;
// Trace / Monitor
  int trace_rx;
  int trace_tx;
  int trace_offset;
  int monitor_pps;
  int monitor_trigger_duration;
// eCPRI
  int flow_id;
  int start_receiving;
  int master;
  int trx_read_null;
  int one_way_measure;
  int one_way_period;
  int tdd_frame_start;
  int timestamp_frames;
  int master_timestamp_check_period;
  int slave_timestamp_check_period;
  int disable_rx_copy;
} TRXEcpriState;

typedef struct {
  int64_t counter;
  int64_t pps_counter;
  int64_t pps_ts;
  int64_t pps;
} counter_stat_t;

typedef struct {
  volatile void * buffer;
  char name[64];
  int buf_len;
  int len;
  volatile int write_index;
  volatile int read_index;
  volatile int write_ahead;
} ring_buffer_t;

typedef struct {
  int64_t count;
  uint8_t wait;
  uint8_t zeroes;
} sample_group_t;

/* Proprietary code:
     - compression / decompression of IQ samples
     - fast conversion between int16_t and float
*/
#include "private/bf1_avx2.c"
//#include "private/bf1_avx2_nop.c"

// Buffers
static ring_buffer_t rx_rbuf; // Received packets
static ring_buffer_t trxr_rbuf[4]; // Decoded IQ samples
static ring_buffer_t tx_rbuf; // Packets to send
static ring_buffer_t trxw_rbuf[4]; // Uncompressed IQ samples
static ring_buffer_t trxw_group_rbuf; // Group of IQ samples

static ring_buffer_t one_way_rbuf; // One way delay measurements

// Counters
static volatile counter_stat_t recv_counter; // frames received from eRE
static volatile counter_stat_t decode_counter; // decoded frames
static volatile counter_stat_t read_counter; // frames passed to amarisoft stack
static volatile counter_stat_t write_counter; // samples to write from TRX
static volatile counter_stat_t encode_counter; // encoded frames
static volatile counter_stat_t trx_encode_counter; // encoded frames
static volatile counter_stat_t sent_counter; // frames sent to eRE
static volatile counter_stat_t rx_drop_counter; // frames sent to eRE
static volatile counter_stat_t tx_drop_counter; // frames sent to eRE
static volatile counter_stat_t empty_encode_counter; // frames sent to eRE

static volatile int sync_complete = 0;
static volatile int received_pkts = 0;
static volatile int recv_pps_threshold_hit = 0;
static int first_trx_write = 1;

#ifndef DPDK
static uint8_t pkt_frame_full[1024];
#endif

static volatile int rx_trace_ready = 0;
static volatile int tx_trace_ready = 0;
static int64_t encode_counter_prev = 0;
static int64_t decode_counter_prev = 0;

static int recv_pps_threshold;

//DEBUG
static volatile int min_count_trx = 1000000;

// Network
static volatile int tx_seq_id;
static volatile int rx_seq_id;
static volatile uint64_t one_way_timestamp;
static volatile int64_t rxtx_shift;
static volatile int64_t slave_ts_offset;

static int64_t tx_initial_ts;
static int64_t sync_encode_counter;

static void rbuf_update_write_index(ring_buffer_t * rbuf) {
  rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
}
static void rbuf_update_read_index(ring_buffer_t * rbuf) {
  rbuf->read_index = (rbuf->read_index + 1) % rbuf->buf_len;
}
static int rbuf_read_amount(ring_buffer_t * rbuf) {
  return (rbuf->write_index + rbuf->buf_len - rbuf->read_index) % rbuf->buf_len;
}
static int rbuf_write_amount(ring_buffer_t * rbuf) {
  // Don't write everything to avoid write index catching up to read index 
  // That we way we don't have to use locks
  return ((rbuf->read_index + rbuf->buf_len - rbuf->write_index - 1) % rbuf->buf_len);
}
static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, int n) {
  int ret = n;
  if(rbuf1) {
    n = rbuf1->buf_len - rbuf1->read_index;
    ret = n < ret ? n : ret;
  }
  if(rbuf2)
    n = rbuf2->buf_len - rbuf2->write_index;
  return n < ret ? n : ret;
}
#define RBUF_READ0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len))
#define RBUF_WRITE0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len))
#define RBUF_READ(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.read_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_WRITE(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.write_index + i) % rbuf.buf_len) * rbuf.len))
#define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\
{\
  log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (((int) _buf_len) * ((int) _len) * sizeof(type)));\
  rbuf.buffer = (type *) malloc(((int) _buf_len) * ((int) _len) * sizeof(type));\
  strcpy(rbuf.name, _name);\
  rbuf.buf_len = _buf_len;\
  rbuf.len = _len;\
  rbuf.write_index = 0;\
  rbuf.read_index = 0;\
  rbuf.write_ahead = 0;\
} while(0)

static void print_stats(FILE * f, int print_header) {
  if(print_header) {
    fprintf(f, 
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
#ifdef STAT_SHOW_COUNTERS
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
#endif
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "\n",
        "RX",
        "TX",
#ifdef STAT_SHOW_COUNTERS
        "RECEIVED",
        "DECODE",
        "READ",
        "WRITE",
        "ENCODE",
        "SENT",
        "EMPTY TX",
#endif
        "RECEIVED",
        "DECODE",
        "READ",
        "WRITE",
        "ENCODE",
        "SENT",
        "EMPTY TX",
        "RX",
        "TRXR",
        "TRXW",
        "TX",
        "SLAVE",
        "MASTER");
    fprintf(f, 
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
#ifdef STAT_SHOW_COUNTERS
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
#endif
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "%-" STAT_INT_LEN "s "
        "\n",
        "DROPPED",
        "DROPPED",
#ifdef STAT_SHOW_COUNTERS
        "COUNTER",
        "COUNTER",
        "COUNTER",
        "COUNTER",
        "COUNTER",
        "COUNTER",
        "COUNTER",
#endif
        "PPS",
        "PPS",
        "PPS",
        "PPS",
        "PPS",
        "PPS",
        "PPS",
        "DELAY",
        "DELAY",
        "DELAY",
        "DELAY",
        "",
        "");
  }
  fprintf(f, 
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
#ifdef STAT_SHOW_COUNTERS
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
#endif
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "d "
      "%-" STAT_INT_LEN "d "
      "%-" STAT_INT_LEN "d "
      "%-" STAT_INT_LEN "d "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "%-" STAT_INT_LEN "" PRIi64 " "
      "\n",
      rx_drop_counter.counter,
      tx_drop_counter.counter,
#ifdef STAT_SHOW_COUNTERS
      recv_counter.counter,
      decode_counter.counter,
      read_counter.counter,
      write_counter.counter,
      encode_counter.counter,
      sent_counter.counter,
      empty_encode_counter.counter,
#endif
      recv_counter.pps,
      decode_counter.pps,
      read_counter.pps,
      write_counter.pps,
      encode_counter.pps,
      sent_counter.pps,
      empty_encode_counter.pps,
      rbuf_read_amount(&rx_rbuf),
      rbuf_read_amount(&trxr_rbuf[0]),
      rbuf_read_amount(&trxw_rbuf[0]),
      rbuf_read_amount(&tx_rbuf),
      rxtx_shift,
      slave_ts_offset);
}

static void log_exit(const char * section, const char * msg, ...) {
  time_t t;
  struct tm ts;
  char line[256];
  va_list arglist;

  time(&t);
  ts = *localtime(&t);
  strftime(line, 80, "%m-%d %H:%M:%S", &ts);
  sprintf(line + strlen(line), " EXIT [%s] ", section);
  va_start(arglist, msg);
  vsprintf(line + strlen(line), msg, arglist);
  va_end(arglist);
  fprintf(stderr, "%s\n", line);

  // Dump useful information
  print_stats(stderr, 1);
  fprintf(stderr, "TX RBUF: ri %d wi %d ra %d wa %d\n", tx_rbuf.read_index, tx_rbuf.write_index, rbuf_read_amount(&tx_rbuf), rbuf_write_amount(&tx_rbuf));

  fprintf(stderr, "RX RBUF: ri %d wi %d ra %d wa %d\n", rx_rbuf.read_index, rx_rbuf.write_index, rbuf_read_amount(&rx_rbuf), rbuf_write_amount(&rx_rbuf));
  fprintf(stderr, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
  fprintf(stderr, "TRXR RBUF: ri %d wi %d ra %d wa %d\n", trxr_rbuf[0].read_index, trxr_rbuf[0].write_index, rbuf_read_amount(&trxr_rbuf[0]), rbuf_write_amount(&trxr_rbuf[0]));
  fprintf(stderr, "TRXW GROUP RBUF: ri %d wi %d ra %d wa %d\n", trxw_group_rbuf.read_index, trxw_group_rbuf.write_index, rbuf_read_amount(&trxw_group_rbuf), rbuf_write_amount(&trxw_group_rbuf));

  fflush(stdout);
  fflush(stderr);
  exit(EXIT_FAILURE);
}


#define BURST_SIZE 16
#define TX_POOL_SIZE 16
#ifdef DPDK
#include "dpdk.c"
/* DPDK */
#else

struct rte_mbuf {
  int buf_addr;
  int data_off;
};

void rte_pktmbuf_free(void * pkt) {
  (void) pkt;
  //for(int i = 0; i < 1000; i ++)
  //  asm("NOP");
}
#endif

static void init_counter(volatile counter_stat_t * c) {
  c->counter = 0;
  c->pps_counter = 0;
  c->pps_ts = 0;
  c->pps = 0;
}
static void update_counter_pps(volatile counter_stat_t * c) {
  struct timespec _ts;
  int64_t ts;
  clock_gettime(CLOCK_TAI, &_ts);
  ts = ts_to_int(_ts);
  if((ts - c->pps_ts) > PPS_UPDATE_PERIOD) {
    if(c->pps_ts)
      c->pps = ((c->counter - c->pps_counter) * NSEC_PER_SEC) / (ts - c->pps_ts);
    c->pps_counter = c->counter;
    c->pps_ts = ts;
  }
}
static void update_counter(volatile counter_stat_t * c, int64_t v) {
  c->counter += v;
}

#define GETTIME_MAX_C INT64_C(7000000000)
static int64_t gettime(int64_t initial_ts, int64_t counter, int64_t freq) {
  int64_t ret = initial_ts;
  for(; counter >= GETTIME_MAX_C; counter -= GETTIME_MAX_C)
    ret += GETTIME_MAX_C * NSEC_PER_SEC / freq;
  ret += (counter * NSEC_PER_SEC / freq);
  return ret;
}

static void trace_handler(struct timespec initial, TRXEcpriState * s) {
  struct timespec next;
  int ready = 1;
  if(s->trace_tx)
    ready &= tx_trace_ready;
  if(s->trace_rx)
    ready &= rx_trace_ready;
  if(ready) {
    int64_t d;
    FILE * f;
    char n[256];
    int start;
    uint8_t ones[14];
    for(int i = 0; i < 14; i++)
      ones[i] = 0xff;

    clock_gettime(CLOCK_TAI, &next);
    d = calcdiff_ns(next, initial);
    log_info("TRACE", "Packets sent: %" PRIi64, sent_counter.counter);
    log_info("TRACE", "Duration: %" PRIi64, d);

    usleep(1000 * 200);

    memset(n, '\0', 256);
    sprintf(n, "%s/tx.trace", s->log_directory);
    f = fopen(n, "wb+");
    start = (s->trace_offset + encode_counter_prev) % tx_rbuf.buf_len;
    log_info("TRACE", "Writing %d frames to tx.trace", (tx_rbuf.write_index + tx_rbuf.buf_len - start) % tx_rbuf.buf_len);
    for(int i = start; i != tx_rbuf.write_index; i = (i + 1) % tx_rbuf.buf_len) {
      fwrite(ones, 14, 1, f);
      fwrite(((uint8_t*) tx_rbuf.buffer) + i * tx_rbuf.len, tx_rbuf.len, 1, f);
    }
    fclose(f);

    memset(n, '\0', 256);
    sprintf(n, "%s/trxw.trace", s->log_directory);
    f = fopen(n, "wb+");
    start = (s->trace_offset) % trxw_rbuf[0].buf_len;
    log_info("TRACE", "Writing %d frames to trxw.trace", (trxw_rbuf[0].write_index + trxw_rbuf[0].buf_len - start) % trxw_rbuf[0].buf_len);
    for(int i = start; i != trxw_rbuf[0].write_index; i = (i + 1) % trxw_rbuf[0].buf_len) {
      for(int j = 0; j < s->tx_n_channel; j++)
        fwrite((uint8_t *) (((Complex *) trxw_rbuf[j].buffer) + i * trxw_rbuf[0].len), trxw_rbuf[0].len * sizeof(Complex), 1, f);
    }
    fclose(f);

    memset(n, '\0', 256);
    sprintf(n, "%s/rx.trace", s->log_directory);
    f = fopen(n, "wb+");
    start = s->trace_offset % rx_rbuf.buf_len;
    log_info("TRACE", "Writing %d frames to rx.trace", (rx_rbuf.write_index + rx_rbuf.buf_len - start) % rx_rbuf.buf_len);
    for(int i = start; i != rx_rbuf.write_index; i = (i + 1) % rx_rbuf.buf_len) {
      fwrite(((uint8_t*) rx_rbuf.buffer) + i * rx_rbuf.len, rx_rbuf.len, 1, f);
    }
    fclose(f);

    memset(n, '\0', 256);
    sprintf(n, "%s/trxr.trace", s->log_directory);
    f = fopen(n, "wb+");
    start = (s->trace_offset + decode_counter_prev) % trxr_rbuf[0].buf_len;
    log_info("TRACE", "Writing %d frames to trxr.trace", (trxr_rbuf[0].write_index + trxr_rbuf[0].buf_len - start) % trxr_rbuf[0].buf_len);
    for(int i = start; i != trxr_rbuf[0].write_index; i = (i + 1) % trxr_rbuf[0].buf_len) {
      for(int j = 0; j < s->rx_n_channel; j++)
        fwrite((uint8_t *) (((Complex *) trxr_rbuf[j].buffer) + i * trxr_rbuf[0].len), trxr_rbuf[0].len * sizeof(Complex), 1, f);
    }
    fclose(f);
    log_exit("", "Finished tracing");
  }
}

static void *recv_thread(void *p) {

  cpu_set_t mask;
  TRXEcpriState * s = (TRXEcpriState *) p;

#ifdef DPDK
  int first_seq_id = 1;
#else
  int64_t target_counter = 0;
  struct timespec current, previous;
#endif

  log_info("RECV_THREAD", "Thread init");
  // Set thread CPU affinity
  CPU_ZERO(&mask);
  CPU_SET(s->recv_affinity, &mask);
  if (sched_setaffinity(0, sizeof(mask), &mask))
    error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->recv_affinity);

#define RTE_MBUF_SIZE 20000
#define MIN_RX 10000

  for(int64_t i = 0;; i++) {

    struct rte_mbuf * pkt[RTE_MBUF_SIZE];
    uint8_t * buf;
#ifdef DPDK
    uint8_t * rtebuf;
    int port = 0;
#endif
    int nb_rx = 0;
    int n;
    int drop_packet = 0;
    int drop_total = 0;

#ifdef DPDK
    while(!nb_rx)
      nb_rx = rte_eth_rx_burst(port, 0, pkt + nb_rx, 1024);
#else
    // Limit packets sent
    if(recv_counter.counter >= target_counter) {
      clock_gettime(CLOCK_TAI, &current);
      if(!i || calcdiff_ns(current, previous) >= (1000 * 1000 * 10)) {
        target_counter += s->frame_frequency / 100;
        previous = current;
      }
    }
    if(recv_counter.counter < target_counter) {
      nb_rx = 1024;
      usleep(200);
    }
    else
      continue;
    
#endif
  
    if(nb_rx > RTE_MBUF_SIZE)
      log_exit("RECV_THREAD", "nb_rx (%d) > RTE_MBUF_SIZE (%d)", nb_rx, RTE_MBUF_SIZE);

    received_pkts = 1;

    n = rbuf_write_amount(&rx_rbuf);
    drop_packet = nb_rx > n;
  
    if(drop_packet) {
      for(int i = 0; i < nb_rx; i++)
        rte_pktmbuf_free(pkt[i]);
      if(nb_rx)
        update_counter(&rx_drop_counter, nb_rx);
    }
    else {
      int nc; int nr; int k = 0;
      nr = nb_rx;
      while((nc = rbuf_contiguous_copy(NULL, &rx_rbuf, nr))) {
        int drop = 0;

        if(s->trace_rx) {
          if((recv_counter.counter + nc) >= (rx_rbuf.buf_len + s->trace_offset)) {
            rx_trace_ready = 1;
            log_info("RECV_THREAD", "RX Trace ready");
            pthread_exit(EXIT_SUCCESS);
          } else if (rx_trace_ready) {
            pthread_exit(EXIT_SUCCESS);
          }
        }

        buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.write_index * rx_rbuf.len);
        for(int i = 0; i < nc; i++) {
#ifdef DPDK
          rtebuf = (uint8_t *) (pkt[i + k])->buf_addr + (pkt[i + k])->data_off;
          if(first_seq_id) {
            uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]);
            printf("seq_id = %d\n", seq_id);
            first_seq_id = 0;
          }
          if(s->master == 1) {
            uint16_t _rx_seq_id;
            uint16_t seq_id = htons(((uint16_t *) (rtebuf + 20))[0]);
            rx_seq_id++;
            _rx_seq_id = rx_seq_id % 65536;
            if(_rx_seq_id != seq_id) {
              if(_rx_seq_id > seq_id)
                rx_seq_id += 65536 + seq_id - _rx_seq_id;
              else
                rx_seq_id += seq_id - _rx_seq_id;
            }
          }

          if((pkt[i + k])->data_len != rx_rbuf.len) {
            for(int j = 0; j < pkt[i + k]->data_len; j++) {
              printf("%02x", rtebuf[j]);
            }
            printf("\n");
            log_error("RECV", "Packet data length (%d) != RX buffer len (%d)", pkt[i + k]->data_len, rx_rbuf.len);
          }
          if(ntohl(*((uint32_t*) (rtebuf + 12))) != 0xaefe) {
            drop++; drop_total++;
            continue;
          }
          memcpy(buf + i * rx_rbuf.len, rtebuf, rx_rbuf.len);
#else
          //memcpy(buf + i * rx_rbuf.len, pkt_frame_full, rx_rbuf.len);
#endif
        }
        rx_rbuf.write_index = (rx_rbuf.write_index + nc - drop) % rx_rbuf.buf_len;
        for(int i = 0; i < nc; i++)
          rte_pktmbuf_free(pkt[i + k]);
        nr -= nc;
        k += nc;
      }
    }
    update_counter(&recv_counter, nb_rx - drop_total);

  }
  pthread_exit(EXIT_SUCCESS);
}

// Send as soon as packets are encoded
// Signal to encode thread that packets has been sent
static void *send_thread(void *p) {

  cpu_set_t mask;
  struct timespec initial;
  TRXEcpriState * s = (TRXEcpriState *) p;

  log_info("SEND_THREAD", "Thread init");
  // Set thread CPU affinity
  CPU_ZERO(&mask);
  CPU_SET(s->send_affinity, &mask);
  if (sched_setaffinity(0, sizeof(mask), &mask))
    error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->send_affinity);
  clock_gettime(CLOCK_TAI, &initial);

  for(int64_t i = 1;; i++) {

    int64_t n = rbuf_read_amount(&tx_rbuf);

    if(n >= BURST_SIZE) {
      int nb_burst = n / BURST_SIZE;

      for(int j = 0; j < nb_burst; j++) {

#ifdef DPDK
        struct rte_mbuf * pkt[TX_POOL_SIZE];
        struct rte_ether_hdr *eth_hdr;
        uint16_t nb_tx = 0;

        for(int i = 0; i < TX_POOL_SIZE; i++) {
          int pkt_size;
          pkt[i] = rte_pktmbuf_alloc(tx_mbuf_pool);
          eth_hdr = rte_pktmbuf_mtod(pkt[i], struct rte_ether_hdr*);
          eth_hdr->d_addr = d_addr;
          eth_hdr->s_addr = s_addr;
          eth_hdr->ether_type = htons(0xaefe);
          memcpy(rte_pktmbuf_mtod_offset(pkt[i], uint8_t *, sizeof(struct rte_ether_hdr)), RBUF_READ0(tx_rbuf, uint8_t), tx_rbuf.len);
          rbuf_update_read_index(&tx_rbuf);
          pkt_size = 14 + tx_rbuf.len;
          pkt[i]->data_len = pkt_size;
          pkt[i]->pkt_len = pkt_size;
        }
        while(nb_tx < TX_POOL_SIZE) {
          int64_t x = TX_POOL_SIZE - nb_tx;
          nb_tx += rte_eth_tx_burst(0, 0, pkt + nb_tx, x > BURST_SIZE ? BURST_SIZE : x);
        }

        /* Free any unsent packets. */
        if (nb_tx < BURST_SIZE) {
          uint16_t buf;
          for (buf = nb_tx; buf < BURST_SIZE; buf++)
            rte_pktmbuf_free(pkt[buf]);
          log_exit("SEND_THREAD", "Sent %d packets instead of %d", nb_tx, BURST_SIZE);
        }
#else
        for(int i = 0; i < 3000; i++)
          asm("NOP");
#endif
      }
      update_counter(&sent_counter, nb_burst * BURST_SIZE);
    }

  }
  pthread_exit(EXIT_SUCCESS);
}

static int one_way_delay_measure(uint8_t ** buf) {
  struct timespec t;
  clock_gettime(CLOCK_TAI, &t);
  one_way_timestamp = ts_to_int(t);
  *(            (*buf - 7)) = 3;                   // eCPRI Type
  *(            (*buf - 4)) = 0;                   // Measurement ID
  *(            (*buf - 3)) = 0;                   // Action Type
  *((uint64_t *)(*buf - 2)) = one_way_timestamp;   // Timestamp
  *((uint64_t *)(*buf + 7)) = 0;                   // Compensation
  *buf += tx_rbuf.len;
  return 1;
}

static void encode_empty_frames(int n, int trx, TRXEcpriState * s) {
  uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8;
  int n2 = n;
  for(int i = 0; i < n; i++) {

    // ONE WAY DELAY MEASURE
    if(s->one_way_measure && (encode_counter.counter + i) % s->one_way_period)
      n2 += one_way_delay_measure(&buf);
    // TDD FRAME START
    if(s->tdd_frame_start)
      *((uint8_t *)(buf + 60 * s->tx_n_channel)) = (trx && !((trx_encode_counter.counter + i) % s->tdd_period));
    // TIMESTAMP
    if(s->timestamp_frames) {
      if(s->master)
        *((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(0, sync_encode_counter++ + rxtx_shift, s->frame_frequency);
      else
        *((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(tx_initial_ts, sync_encode_counter++, s->frame_frequency);
    }

    memset(buf, 0x00, 60 * s->tx_n_channel);
    *((uint16_t *)(buf - 2)) = htons(tx_seq_id++);
    buf += tx_rbuf.len;
  }
  tx_rbuf.write_index = (tx_rbuf.write_index + n2) % tx_rbuf.buf_len;
  if(trx)
    trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + n) % trxw_rbuf[0].buf_len;
}
static void encode_iq_samples(int n, TRXEcpriState * s) {
  int nc;
  int nf = n;
  while((nc = rbuf_contiguous_copy(&trxw_rbuf[0], &tx_rbuf, nf))) {
    Complex * iq_samples[4];
    uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8;
    int nc2 = nc;
    for(int j = 0; j < s->tx_n_channel; j++)
      iq_samples[j] = ((Complex *) trxw_rbuf[j].buffer) + (trxw_rbuf[0].read_index * trxw_rbuf[0].len);

    for(int i = 0; i < nc; i++) {

      // ONE WAY DELAY MEASURE
      if(s->one_way_measure && (encode_counter.counter + i) % s->one_way_period)
        nc2 += one_way_delay_measure(&buf);
      // TDD FRAME START
      if(s->tdd_frame_start)
        *((uint8_t *)(buf + 60 * s->tx_n_channel)) = !((trx_encode_counter.counter + i) % s->tdd_period);
      // TIMESTAMP
      if(s->timestamp_frames) {
        if(s->master)
          *((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(0, sync_encode_counter++ + rxtx_shift, s->frame_frequency);
        else
          *((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(tx_initial_ts, sync_encode_counter++, s->frame_frequency);
      }

      for(int i = 0; i < s->tx_n_channel ; i++)
        encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]);
      *((uint16_t *)(buf - 2)) = htons(tx_seq_id++);
      for(int j = 0; j < s->tx_n_channel; j++)
        iq_samples[j] += trxw_rbuf[0].len;
      buf += tx_rbuf.len;
    }
    tx_rbuf.write_index = (tx_rbuf.write_index + nc2) % tx_rbuf.buf_len;
    trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nc) % trxw_rbuf[0].buf_len;
    nf -= nc;
  }
  if(nf)
    exit(EXIT_FAILURE);
}

int encode_trx(int n_max, TRXEcpriState * s) {
  int remain = n_max;

  while(remain && rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) {
    sample_group_t * g;
    int n_trx;
    
    g = RBUF_READ0(trxw_group_rbuf, sample_group_t);
    if(g->wait) {
      g->wait = 0;
      g->count -= trx_encode_counter.counter;
      g->zeroes = 1;
    }
    n_trx = g->count > remain ? remain : g->count;
    g->count -= n_trx;

    if(s->trace_tx) {
      if(sync_complete && (encode_counter.counter + n_trx) >= (tx_rbuf.buf_len + s->trace_offset)) {
        tx_trace_ready = 1;
        log_info("ENCODE_THREAD", "TX Trace ready");
        pthread_exit(EXIT_SUCCESS);
      } else if (tx_trace_ready) {
        pthread_exit(EXIT_SUCCESS);
      }
    }

    if(g->zeroes)
      encode_empty_frames(n_trx, 1, s);
    else
      encode_iq_samples(n_trx, s);

    if(!g->count) {
      rbuf_update_read_index(&trxw_group_rbuf);
    }
    update_counter(&trx_encode_counter, n_trx);
    remain -= n_trx;
  }
  return (n_max - remain);
}

/*
If sync has happenned (=we have received frames):
  Prepare as soon as TRX has packet to write
  Signal 
Else:
  Prepare as soon as there is space in tx buffer
*/
#define TX_SYNC_BURST_SIZE 512
#define MASTER_BURST 1024
static void *encode_thread(void *p) {

  cpu_set_t mask;
  TRXEcpriState * s = (TRXEcpriState *) p;
  int64_t target_counter = 0;
  struct timespec next;

  // Set thread CPU affinity
  CPU_ZERO(&mask);
  CPU_SET(s->encode_affinity, &mask);
  if (sched_setaffinity(0, sizeof(mask), &mask))
    error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity);

  // Slave mode: first TS uses clock_gettime, then add fixed amount to timestamp
  // Master mode: Computes TX RX offset using received TS to set the TS
  //              At each beginning of TDD frame (or less often), send empty frames to align with RX
  for(int64_t i = 0;; i++) {
    int n, n_trx;
    int n_empty = 0;
    int n_min, n_max;

    if(s->master && !i) {
      clock_gettime(CLOCK_TAI, &next);
      tx_initial_ts = ts_to_int(next);
      add_ns(&next, (((int64_t) MASTER_BURST) * NSEC_PER_SEC) / s->frame_frequency);
    }

    n = rbuf_write_amount(&tx_rbuf);
    n_min = s->encode_burst;
    n_min += s->one_way_measure ? (s->encode_burst / s->one_way_period) + 1 : 0;
    if(s->master && n < n_min)
      log_exit("ENCODE_THREAD", "Not enough space in TX RBUF (%d < %d)\n", n, s->encode_burst);
    n_max = s->encode_burst ? s->encode_burst : n;

    n_trx = encode_trx(n_max, s);

    if(s->master) {
      struct timespec current;
      n_empty = s->encode_burst - n_trx;
      encode_empty_frames(n_empty, 0, s);
      target_counter += s->encode_burst;
      next = int_to_ts(gettime(tx_initial_ts, target_counter, s->frame_frequency));
      do {
        clock_gettime(CLOCK_TAI, &current);
      } while(current.tv_sec < next.tv_sec || (current.tv_sec == next.tv_sec && current.tv_nsec < next.tv_nsec));
    }

    update_counter(&encode_counter, n_trx + n_empty);
    update_counter(&empty_encode_counter, n_empty);
  }
  pthread_exit(EXIT_SUCCESS);
}
static void *decode_thread(void *p) {

  cpu_set_t mask;
  TRXEcpriState * s = (TRXEcpriState *) p;
  struct timespec next;
  int64_t target_counter = 0;
  int64_t sync_decode_counter = 0;
  int reset_decode_counter = 1;

  log_info("DECODE_THREAD", "Thread init");
  // Set thread CPU affinity
  CPU_ZERO(&mask);
  CPU_SET(s->decode_affinity, &mask);
  if (sched_setaffinity(0, sizeof(mask), &mask))
    error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->decode_affinity);

  for(int64_t i = 0;; i++) {
    int n, nc;
    if(s->start_receiving) {
      if(!received_pkts) {
        if(i == 0)
          clock_gettime(CLOCK_TAI, &next);
        // Limit packets sent
        if(decode_counter.counter > target_counter) {
          int k = (decode_counter.counter - target_counter + (s->frame_frequency / 100) - 1) / (s->frame_frequency / 100);
          add_ns(&next, k * 1000 * 1000 * 10); // 10ms to send 38400 packets
          clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
          target_counter += k * (s->frame_frequency / 100);
        }
        n = (s->frame_frequency / 100);
        for(int j = 0; j < n; j++)
          rbuf_update_write_index(&trxr_rbuf[0]);
        update_counter(&decode_counter, n);
        continue;
      }
      else if (reset_decode_counter) {
        if(s->trace_rx)
          decode_counter_prev = decode_counter.counter;
        decode_counter.counter = 0;
        reset_decode_counter = 0;
      }
    }

    while(!(n = rbuf_read_amount(&rx_rbuf))) {};
    while(rbuf_write_amount(&trxr_rbuf[0]) < n) {};

    while((nc = rbuf_contiguous_copy(&rx_rbuf, &trxr_rbuf[0], n))) {

      uint8_t * buf = ((uint8_t *) rx_rbuf.buffer) + (rx_rbuf.read_index * rx_rbuf.len) + 22;
      int one_way_count = 0;
  
      if(s->trace_rx) {
        if(received_pkts && ((decode_counter.counter + nc) >= (trxr_rbuf[0].buf_len + s->trace_offset))) {
          rx_trace_ready = 1;
          log_info("DECODE_THREAD", "RX Trace ready");
          pthread_exit(EXIT_SUCCESS);
        } else if (rx_trace_ready) {
          pthread_exit(EXIT_SUCCESS);
        }
      }

      Complex * iq_samples[4];
      for(int i = 0; i < s->rx_n_channel; i++)
        iq_samples[i] = (((Complex *) trxr_rbuf[i].buffer) + (trxr_rbuf[0].write_index * trxr_rbuf[0].len));

      for(int i = 0; i < nc; i++) {

        // ONE WAY MEASURE
        if(s->one_way_measure && *((buf + i * rx_rbuf.len) - 7) == 3) {
          uint64_t timestamp = (uint64_t) *(buf + 7);
          *(RBUF_WRITE0(one_way_rbuf, int64_t)) = ((int64_t) timestamp) - ((int64_t) one_way_timestamp);
          rbuf_update_write_index(&one_way_rbuf);
          continue;
        }
        // TIMESTAMP
        if(s->master && s->timestamp_frames && !(sync_decode_counter % s->master_timestamp_check_period)) {
          int64_t timestamp = (int64_t) *(buf + s->rx_n_channel * 60);
          rxtx_shift = gettime(0, timestamp, s->frame_frequency) - sync_decode_counter;
        }
        if(!s->master && s->timestamp_frames && !(sync_decode_counter % s->slave_timestamp_check_period)) {
          struct timespec t;
          int64_t timestamp = (int64_t) *(buf + s->rx_n_channel * 60);
          clock_gettime(CLOCK_TAI, &t);
          slave_ts_offset = ts_to_int(t) - timestamp;
        }

        for(int j = 0; j < s->rx_n_channel ; j++) {
          decode_s64_b60_2((float *) (iq_samples[j] + i * 32), buf + j * 60 + i * rx_rbuf.len);
        }
        sync_decode_counter++;
      }

      trxr_rbuf[0].write_index = (trxr_rbuf[0].write_index + nc) % trxr_rbuf[0].buf_len;
      rx_rbuf.read_index = (rx_rbuf.read_index + nc) % rx_rbuf.buf_len;
      n -= nc;
      update_counter(&decode_counter, nc);
    }

  }
  pthread_exit(EXIT_SUCCESS);
}
static void *statistic_thread(void *p) {
  struct timespec next, initial;
  int64_t recv_stop = 0;
  cpu_set_t mask;
  TRXEcpriState * s = (TRXEcpriState *) p;
  FILE * stats_file_desc;

  log_info("STATISTIC_THREAD", "Thread init");

  char stats_file_name[256];
  memset(stats_file_name, '\0', 256);
  sprintf(stats_file_name, "%s/ecpri.stats", s->log_directory);
  stats_file_desc = fopen(stats_file_name, "w+");

  if(!stats_file_desc)
    error(EXIT_FAILURE, errno, "Couldn't open %s\n", stats_file_name);

  // Set thread CPU affinity
  CPU_ZERO(&mask);
  CPU_SET(s->statistic_affinity, &mask);
  if (sched_setaffinity(0, sizeof(mask), &mask))
    error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->statistic_affinity);

  clock_gettime(CLOCK_TAI, &initial);
  next = initial;
  for(int64_t i = 0;; i++) {
    add_ns(&next, s->statistics_refresh_rate_ns);
    if(s->trace_rx || s->trace_tx)
      trace_handler(initial, s);
    print_stats(stats_file_desc, (i % 50) == 0);
#ifdef DEBUG
    fprintf(stats_file_desc, 
        "%d %d %d %d %d %d %d %d\n", 
        rx_rbuf.write_index,
        rx_rbuf.read_index,
        trxr_rbuf[0].write_index,
        trxr_rbuf[0].read_index,
        trxw_rbuf[0].write_index,
        trxw_rbuf[0].read_index,
        tx_rbuf.write_index,
        tx_rbuf.read_index);
    fprintf(stats_file_desc, "TRXW RBUF: ri %d wi %d ra %d wa %d\n", trxw_rbuf[0].read_index, trxw_rbuf[0].write_index, rbuf_read_amount(&trxw_rbuf[0]), rbuf_write_amount(&trxw_rbuf[0]));
#endif
    fflush(stats_file_desc);

    update_counter_pps(&rx_drop_counter);
    update_counter_pps(&tx_drop_counter);
    update_counter_pps(&recv_counter);
    update_counter_pps(&decode_counter);
    update_counter_pps(&read_counter);
    update_counter_pps(&write_counter);
    update_counter_pps(&encode_counter);
    update_counter_pps(&trx_encode_counter);
    update_counter_pps(&sent_counter);
    update_counter_pps(&empty_encode_counter);
    if(s->monitor_pps) {
      if(recv_counter.pps > recv_pps_threshold) {
        recv_pps_threshold_hit = 1;
      }
      if(recv_pps_threshold_hit && recv_counter.pps < recv_pps_threshold) {
        struct timespec _ts;
        int64_t ts;
        clock_gettime(CLOCK_MONOTONIC, &_ts);
        ts = ts_to_int(_ts);
        if((recv_stop && ((ts - recv_stop) > (s->monitor_trigger_duration) * INT64_C(1000000000)))) {
          if(s->monitor_pps == 1)
            log_exit("MONITOR", "Stopped recieving packets, restarting...");
          log_info("MONITOR", "Stopped recieving packets, sending again...");
          sync_complete = 0;
          recv_stop = 0;
        }
        if(!recv_stop)
          recv_stop = ts;
      }
    }

    clock_nanosleep(CLOCK_TAI, TIMER_ABSTIME, &next, NULL);
  }
  pthread_exit(EXIT_SUCCESS);
}

static int start_threads(TRXEcpriState * s) {
  pthread_t recv_pthread;
  pthread_t send_pthread;
  pthread_t encode_pthread;
  pthread_t decode_pthread;
  pthread_t statistic_pthread;
  struct sched_param recv_param;
  struct sched_param send_param;
  struct sched_param encode_param;
  struct sched_param decode_param;
  struct sched_param statistic_param;
  pthread_attr_t recv_attr;
  pthread_attr_t send_attr;
  pthread_attr_t encode_attr;
  pthread_attr_t decode_attr;
  pthread_attr_t statistic_attr;

  log_info("TRX_ECPRI", "Starting threads");

  // Initialize pthread attributes (default values)
  if (pthread_attr_init(&recv_attr))
    log_error("TRX_ECPRI", "init pthread attributes failed\n");
  // Set a specific stack size
  if (pthread_attr_setstacksize(&recv_attr, PTHREAD_STACK_MIN))
    log_error("TRX_ECPRI", "pthread setstacksize failed\n");
  // Set scheduler policy and priority of pthread
  if (pthread_attr_setschedpolicy(&recv_attr, SCHED_FIFO))
    log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
  recv_param.sched_priority = 97;
  if (pthread_attr_setschedparam(&recv_attr, &recv_param))
    log_error("TRX_ECPRI", "pthread setschedparam failed\n");
  /* Use scheduling parameters of attr */
  if (pthread_attr_setinheritsched(&recv_attr, PTHREAD_EXPLICIT_SCHED))
    log_error("TRX_ECPRI", "pthread setinheritsched failed\n");

  if (pthread_attr_init(&send_attr))
    log_error("TRX_ECPRI", "init pthread attributes failed\n");
  if (pthread_attr_setstacksize(&send_attr, PTHREAD_STACK_MIN))
    log_error("TRX_ECPRI", "pthread setstacksize failed\n");
  if (pthread_attr_setschedpolicy(&send_attr, SCHED_FIFO))
    log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
  send_param.sched_priority = 97;
  if (pthread_attr_setschedparam(&send_attr, &send_param))
    log_error("TRX_ECPRI", "pthread setschedparam failed\n");
  if (pthread_attr_setinheritsched(&send_attr, PTHREAD_EXPLICIT_SCHED))
    log_error("TRX_ECPRI", "pthread setinheritsched failed\n");

  if (pthread_attr_init(&encode_attr))
    log_error("TRX_ECPRI", "init pthread attributes failed\n");
  if (pthread_attr_setstacksize(&encode_attr, PTHREAD_STACK_MIN))
    log_error("TRX_ECPRI", "pthread setstacksize failed\n");
  if (pthread_attr_setschedpolicy(&encode_attr, SCHED_FIFO))
    log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
  encode_param.sched_priority = 97;
  if (pthread_attr_setschedparam(&encode_attr, &encode_param))
    log_error("TRX_ECPRI", "pthread setschedparam failed\n");
  if (pthread_attr_setinheritsched(&encode_attr, PTHREAD_EXPLICIT_SCHED))
    log_error("TRX_ECPRI", "pthread setinheritsched failed\n");

  if (pthread_attr_init(&decode_attr))
    log_error("TRX_ECPRI", "init pthread attributes failed\n");
  if (pthread_attr_setstacksize(&decode_attr, PTHREAD_STACK_MIN))
    log_error("TRX_ECPRI", "pthread setstacksize failed\n");
  if (pthread_attr_setschedpolicy(&decode_attr, SCHED_FIFO))
    log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
  decode_param.sched_priority = 97;
  if (pthread_attr_setschedparam(&decode_attr, &decode_param))
    log_error("TRX_ECPRI", "pthread setschedparam failed\n");
  if (pthread_attr_setinheritsched(&decode_attr, PTHREAD_EXPLICIT_SCHED))
    log_error("TRX_ECPRI", "pthread setinheritsched failed\n");

  if (pthread_attr_init(&statistic_attr))
    log_error("TRX_ECPRI", "init pthread attributes failed\n");
  if (pthread_attr_setstacksize(&statistic_attr, PTHREAD_STACK_MIN))
    log_error("TRX_ECPRI", "pthread setstacksize failed\n");
  if (pthread_attr_setschedpolicy(&statistic_attr, SCHED_FIFO))
    log_error("TRX_ECPRI", "pthread setschedpolicy failed\n");
  statistic_param.sched_priority = 97;
  if (pthread_attr_setschedparam(&statistic_attr, &statistic_param))
    log_error("TRX_ECPRI", "pthread setschedparam failed\n");
  if (pthread_attr_setinheritsched(&statistic_attr, PTHREAD_EXPLICIT_SCHED))
    log_error("TRX_ECPRI", "pthread setinheritsched failed\n");

  if (pthread_create(&statistic_pthread, NULL, statistic_thread, s))
    error(EXIT_FAILURE, errno, "Couldn't create statistic thread");
  usleep(1000 * 20);
  if (pthread_create(&encode_pthread, NULL, encode_thread, s))
    error(EXIT_FAILURE, errno, "Couldn't create encode thread");
  usleep(1000 * 20);
  if (pthread_create(&decode_pthread, NULL, decode_thread, s))
    error(EXIT_FAILURE, errno, "Couldn't create decode thread");
  usleep(1000 * 20);
  if (pthread_create(&send_pthread, NULL, send_thread, s))
    error(EXIT_FAILURE, errno, "Couldn't create send thread");
  usleep(1000 * 500);
  if (pthread_create(&recv_pthread, NULL, recv_thread, s))
    error(EXIT_FAILURE, errno, "Couldn't create recv thread");

  return 0;
}

int startdpdk(TRXEcpriState * s) {
  uint8_t ecpri_message[MAX_PACKET_SIZE];

  int argc = 1;
  int k = 1;
  int prev_space = -1;
  char ** argv;

#ifndef DPDK
  for(int i = 0; i < 262; i++)
    pkt_frame_full[i] = 0xff;
#endif

  for(int i = 0;; i++) {
    if(s->dpdk_options[i] == ' ')
      argc++;
    else if(s->dpdk_options[i] == '\0')
      break;
  }
  argv = (char **) malloc(sizeof(char *) * argc);

  for(int i = 0;; i++) {
    if(s->dpdk_options[i] == ' ') {
      argv[k] = (char *) malloc(i - prev_space);
      strncpy(argv[k], s->dpdk_options + prev_space + 1, i - prev_space -1);
      argv[k][i - prev_space-1] = '\0';
      prev_space = i;
      k++;
    }
    else if(s->dpdk_options[i] == '\0') {
      break;
    }
  }

  argv[0] = "";
#ifdef DPDK
  init_dpdk(argc, argv);
#endif
  log_info("TRX_ECPRI", "Start");

  //set_latency_target();

  tx_seq_id = 0;
  init_counter(&rx_drop_counter);
  init_counter(&tx_drop_counter);
  init_counter(&recv_counter);
  init_counter(&decode_counter);
  init_counter(&read_counter);
  init_counter(&write_counter);
  init_counter(&encode_counter);
  init_counter(&trx_encode_counter);
  init_counter(&sent_counter);
  init_counter(&empty_encode_counter);

  RBUF_INIT(rx_rbuf, "RX ring buffer", s->txrx_buf_size, s->rx_n_channel * 60 + 22 + s->tdd_frame_start + s->timestamp_frames * sizeof(uint64_t), uint8_t);
  RBUF_INIT(tx_rbuf, "TX ring buffer", s->txrx_buf_size, s->tx_n_channel * 60 + 8  + s->tdd_frame_start + s->timestamp_frames * sizeof(uint64_t), uint8_t);
  for(int i = 0; i < s->tx_n_channel; i++) {
    char name[256];
    sprintf(name, "TRXWrite Ring Buffer %d", i);
    RBUF_INIT(trxw_rbuf[i], name, s->trx_buf_size, N_SAMPLES, Complex);
  }
  for(int i = 0; i < s->rx_n_channel; i++) {
    char name[256];
    sprintf(name, "TRXRead Ring Buffer %d", i);
    RBUF_INIT(trxr_rbuf[i], name, s->trx_buf_size, N_SAMPLES, Complex);
  }
  RBUF_INIT(trxw_group_rbuf, "TRXGroupWrite ring buffer", TRX_MAX_GROUP, 1, sample_group_t);

  RBUF_INIT(one_way_rbuf, "One-way ring buffer", 1024, 1, int64_t);

  memset((uint8_t *) ecpri_message, 0, MAX_PACKET_SIZE);

#ifdef DPDK
  if(sscanf((char *) s->re_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
        &d_addr.addr_bytes[0],
        &d_addr.addr_bytes[1],
        &d_addr.addr_bytes[2],
        &d_addr.addr_bytes[3],
        &d_addr.addr_bytes[4],
        &d_addr.addr_bytes[5]) != 6)
    fprintf(stderr, "Invalid eRE MAC address\n");
  if(sscanf((char *) s->rec_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
        &s_addr.addr_bytes[0],
        &s_addr.addr_bytes[1],
        &s_addr.addr_bytes[2],
        &s_addr.addr_bytes[3],
        &s_addr.addr_bytes[4],
        &s_addr.addr_bytes[5]) != 6)
    fprintf(stderr, "Invalid eREC MAC address\n");
#endif

  /* Standard Header */
  ecpri_message[0] = 0x10; // Protocol data revision 0x1, C = 0

  // Message type = 0x00, IQ data
  // Payload size
  *((uint16_t *) (ecpri_message + 2)) = htons(244);
  *((uint16_t *) (ecpri_message + 4)) = htons(s->flow_id);

  for(int i = 0; i < tx_rbuf.buf_len; i++)
    memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_message, tx_rbuf.len);

  start_threads(s);
  return 0;
}

static void trx_ecpri_end(TRXState *s1)
{
  log_info("TRX_ECPRI", "End");
  TRXEcpriState *s = s1->opaque;
  free(s);
}

static int64_t prev_ts = 0;
static int64_t prev_count = 0;
#define M 32

static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{
  int write_count; int64_t ts; sample_group_t * g; int nc; int nk = 0;
  float ** _samples = (float **) __samples;
  TRXEcpriState *s = s1->opaque;
  write_count = count / M;
  ts = timestamp / M;

  if(count)
    min_count_trx = count < min_count_trx ? count : min_count_trx;

  log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count);
  
  if(prev_count && ((ts - prev_ts) != prev_count)) {
    log_exit("TRX_ECPRI_WRITE", "Gap between timestamps: prev_ts %li ts %li prev_count %li count %li diff_ts %li", prev_ts, ts, prev_count, count, (ts - prev_ts));
  }
  prev_ts = ts; prev_count = write_count;

  if(write_count > rbuf_write_amount(&trxw_rbuf[0])) {
    //log_exit("TRX_ECPRI_WRITE", "Not enough space to write in trxw_rbuf (write count = %d)", write_count);
    update_counter(&tx_drop_counter, write_count);
    return;
  }

  if(s->trace_tx) {
    if((write_counter.counter + write_count) >= (trxw_rbuf[0].buf_len + s->trace_offset)) {
      tx_trace_ready = 1;
      log_info("TRX_ECPRI_WRITE", "TX Trace ready");
      pthread_exit(EXIT_SUCCESS);
    } else if (tx_trace_ready) {
      pthread_exit(EXIT_SUCCESS);
    }
  }

  if(first_trx_write) {
    sample_group_t * g2 = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
    g2->count = ts;
    g2->wait = 1;
    g2->zeroes = 1;
    rbuf_update_write_index(&trxw_group_rbuf);
  }
  g = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
  g->zeroes = __samples ? 0 : 1;
  g->wait = 0;
  g->count = write_count;

  while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[0], write_count))) {
    int len = nc * trxr_rbuf[0].len * sizeof(Complex);
    if(__samples)
      for(int i = 0; i < s->tx_n_channel; i++) {
        uint8_t * src = ((uint8_t*) _samples[i]) + (nk * trxr_rbuf[0].len * sizeof(Complex));
        uint8_t * dst = ((uint8_t *) trxw_rbuf[i].buffer) + trxw_rbuf[0].write_index * trxw_rbuf[0].len * sizeof(Complex);
        memcpy(dst, src, len);
      }
    trxw_rbuf[0].write_index = (trxw_rbuf[0].write_index + nc) % trxw_rbuf[0].buf_len;
    write_count -= nc;
    nk += nc;
  }

  rbuf_update_write_index(&trxw_group_rbuf);
  update_counter(&write_counter, count / M);
}

static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__samples, int count, int rx_port_index, TRXReadMetadata *md)
{
  int nc; int n;
  float ** _samples = (float **) __samples;
  int read_count = (count / M);
  int offset = 0;
  TRXEcpriState *s = s1->opaque;

  log_debug("TRX_ECPRI_READ", "count = %ld (%li)", read_count, read_counter.counter);

  while(rbuf_read_amount(&trxr_rbuf[0]) < read_count);
  
  sync_complete = 1;

  n = read_count;
  while((nc = rbuf_contiguous_copy(&trxr_rbuf[0], NULL, n))) {
    int len = nc * trxr_rbuf[0].len * sizeof(Complex);
    for(int i = 0; i < s->rx_n_channel; i++ ) {
      uint8_t * dst = (uint8_t*) (_samples[i] + offset);
      uint8_t * src = ((uint8_t *) trxr_rbuf[i].buffer) + trxr_rbuf[0].read_index * trxr_rbuf[0].len * sizeof(Complex);
      if(!s->disable_rx_copy && !s->trx_read_null)
        memcpy(dst, src, len);
    }
    trxr_rbuf[0].read_index = (trxr_rbuf[0].read_index + nc) % trxr_rbuf[0].buf_len;
    n -= nc;
    offset += len;
  }

  *ptimestamp = (read_counter.counter) * M;
  update_counter(&read_counter, read_count);

  return count;
}

/* This function can be used to automatically set the sample
   rate. Here we don't implement it, so the user has to force a given
   sample rate with the "sample_rate" configuration option */
static int trx_ecpri_get_sample_rate(TRXState *s1, TRXFraction *psample_rate,
    int *psample_rate_num, int sample_rate_min)
{
  return -1;
}

static int trx_ecpri_start(TRXState *s1, const TRXDriverParams *params)
{
  TRXEcpriState *s = s1->opaque;

  log_info("TRX_ECPRI_START", "Start");

  log_info("TRX_ECPRI_START", "trx_api_version: %d", s1->trx_api_version);
  log_info("TRX_ECPRI_START", "config file: %s", s1->path);
  log_info("TEST-DPDK-ECPRI", "rec-mac: %s, re-mac: %s, rec-if: %s", s->rec_mac, s->re_mac, s->rec_if);

  s->sample_rate = params->sample_rate[0].num / params->sample_rate[0].den;

  startdpdk(s);

  return 0;
}

void dummy_enb_init(TRXState *s1, TRXEcpriState *s) {

  s1->trx_write_func2 = trx_ecpri_write;
  s1->trx_read_func2 = trx_ecpri_read;
  startdpdk(s);
}

/* Called to start the tranceiver. Return 0 if OK, < 0 if  */
int trx_start_func(TRXState *s, const TRXDriverParams *p) {
  log_info("DEBUG", "trx_start_func");
  return 0;
}

/* Deprecated, use trx_write_func2 instead.
   Write 'count' samples on each channel of the TX port
   'tx_port_index'. samples[0] is the array for the first
   channel. timestamp is the time (in samples) at which the first
   sample must be sent. When the TRX_WRITE_FLAG_PADDING flag is
   set, samples is set to NULL. It indicates that no data should
   be sent (TDD receive time). TRX_WRITE_FLAG_END_OF_BURST is set
   to indicate in advance that the next write call will have the
   TRX_WRITE_FLAG_PADDING flag set. Note:
   TRX_WRITE_FLAG_END_OF_BURST and TRX_WRITE_FLAG_PADDING are
   never set simultaneously.
 */
void trx_write_func(TRXState *s, trx_timestamp_t timestamp, const void **samples, int count, int flags, int tx_port_index) {
  log_info("DEBUG", "**");
}

/* Deprecated, use trx_read_func2 instead.
   Read 'count' samples from each channel. samples[0] is the array
   for the first channel. *ptimestamp is the time at which the
   first samples was received. Return the number of sample read
   (=count).

   Note: It is explicitely allowed that the application calls
   trx_write_func, trx_read_func, trx_set_tx_gain_func and
   trx_set_rx_gain_func from different threads.
*/
int trx_read_func(TRXState *s, trx_timestamp_t *ptimestamp, void **samples, int count, int rx_port_index) {
  log_info("DEBUG", "**");
  return 0;
}

/* Dynamic set the transmit gain (in dB). The origin and range are
   driver dependent.

   Note: this function is only used for user supplied dynamic
   adjustements.
*/
void trx_set_tx_gain_func(TRXState *s, double gain, int channel_num) {
  log_info("DEBUG", "trx_set_tx_gain_func");
}

/* Dynamic set the receive gain (in dB). The origin and range are
   driver dependent.

   Note: this function is only used for user supplied dynamic
   adjustements.
*/
void trx_set_rx_gain_func(TRXState *s, double gain, int channel_num) {
  log_info("DEBUG", "trx_set_rx_gain_func");
}

/* Return the maximum number of samples per TX packet. Called by
 * the application after trx_start_func.
 * Optional
 */
int trx_get_tx_samples_per_packet_func(TRXState *s) {
  log_info("DEBUG", "trx_get_tx_samples_per_packet_func");
  return 0;
}

/* Return some statistics. Return 0 if OK, < 0 if not available. */
int trx_get_stats(TRXState *s, TRXStatistics *m) {
  log_info("DEBUG", "trx_get_stats");
  return 0;
}

/* Callback must allocate info buffer that will be displayed */
void trx_dump_info(TRXState *s, trx_printf_cb cb, void *opaque) {
  log_info("DEBUG", "*opaque");
}

/* Return the absolute TX power in dBm for the TX channel
   'channel_num' assuming a square signal of maximum
   amplitude. This function can be called from any thread and
   needs to be fast. Return 0 if OK, -1 if the result is not
   available. */
int trx_get_abs_tx_power_func(TRXState *s,
                                 float *presult, int channel_num) {
  log_info("DEBUG", "trx_get_abs_tx_power_func");
  return 0;
}

/* Return the absolute RX power in dBm for the RX channel
   'channel_num' assuming a square signal of maximum
   amplitude. This function can be called from any thread and
   needs to be fast. Return 0 if OK, -1 if the result is not
   available. */
int trx_get_abs_rx_power_func(TRXState *s,
                                 float *presult, int channel_num) {
  log_info("DEBUG", "trx_get_abs_rx_power_func");
  return 0;
}

/* Remote API communication
 * Available since API v14
 * trx_msg_recv_func: called for each trx received messages
 * trx_msg_send_func: call it to send trx messages (They must be registered by client)
 * For each message, a call to send API must be done
 */
void trx_msg_recv_func(TRXState *s, TRXMsg *msg) {
  log_info("DEBUG", "trx_msg_recv_func");
}

TRXMsg* trx_msg_send_func(TRXState *s) {
  log_info("DEBUG", "trx_msg_send_func");
  return NULL;
}

/* Return actual transmit gain (in dB). The origin and range are
   driver dependent.
*/
void trx_get_tx_gain_func(TRXState *s, double *gain, int channel_num) {
  log_info("DEBUG", "trx_get_tx_gain_func");
}

/* Returns actual receive gain (in dB). The origin and range are
   driver dependent.
*/
void trx_get_rx_gain_func(TRXState *s, double *gain, int channel_num) {
  log_info("DEBUG", "trx_get_rx_gain_func");
}

/* Stop operation of the transceiver - to be called after trx_start.
   resources allocated in init are not released, so trx_call can be called again */
void trx_stop_func(TRXState *s) {
  log_info("DEBUG", "trx_stop_func");
}

/* OFDM mode: experimental 7.2 API */

/* read the current timestamp (only used in OFDM mode). Return 0
   if OK, < 0 if not supported by device. */
int trx_read_timestamp(TRXState *s, trx_timestamp_t *ptimestamp,
                          int port_index) {
  log_info("DEBUG", "trx_read_timestamp");
  return 0;
}

void trx_set_tx_streams(TRXState *s, int rf_port_index,
                           int n_streams, const TRXOFDMStreamInfo *streams) {
  log_info("DEBUG", "trx_set_tx_streams");
}
void trx_set_rx_streams(TRXState *s, int rf_port_index,
                           int n_streams, const TRXOFDMStreamInfo *streams) {
  log_info("DEBUG", "trx_set_rx_streams");
}
/* schedule the reading of OFDM symbols. Return 0 if OK, < 0 if
   error. */
int trx_schedule_read(TRXState *s, int rf_port_index,
                         const TRXScheduledSymbol *symbols,
                         int n_symbols) {
  log_info("DEBUG", "trx_schedule_read");
  return 0;
}

/* AGC functions */
int trx_set_agc_func(TRXState *s, const TRXAGCParams *p, int channel) {
  log_info("DEBUG", "trx_set_agc_func");
  return 0;
}
int trx_get_agc_func(TRXState *s, TRXAGCParams *p, int channel) {
  log_info("DEBUG", "trx_get_agc_func");
  return 0;
}

int trx_driver_init(TRXState *s1)
{
  TRXEcpriState *s;
  double val;

  // Lock all current and future pages from preventing of being paged to swap
  if (mlockall(MCL_CURRENT | MCL_FUTURE)) {
    log_error("TRX_ECPRI", "mlockall failed");
  }

  log_info("TRX_ECPRI", "Init");

  if (s1->trx_api_version != TRX_API_VERSION) {
    fprintf(stderr, "ABI compatibility mismatch between LTEENB and TRX driver (LTEENB ABI version=%d, TRX driver ABI version=%d)\n",
        s1->trx_api_version, TRX_API_VERSION);
    return -1;
  }

  s = malloc(sizeof(TRXEcpriState));
  memset(s, 0, sizeof(*s));

  trx_get_param_double(s1, &val, "recv_affinity");
  s->recv_affinity = (int) val;
  trx_get_param_double(s1, &val, "send_affinity");
  s->send_affinity = (int) val;
  trx_get_param_double(s1, &val, "encode_affinity");
  s->encode_affinity = (int) val;
  trx_get_param_double(s1, &val, "decode_affinity");
  s->decode_affinity = (int) val;
  trx_get_param_double(s1, &val, "statistic_affinity");
  s->statistic_affinity = (int) val;
  trx_get_param_double(s1, &val, "flow_id");
  s->flow_id = (int) val;
  trx_get_param_double(s1, &val, "frame_frequency");
  s->frame_frequency = (int) val;
  trx_get_param_double(s1, &val, "trx_buf_size");
  s->trx_buf_size = (int) val;
  trx_get_param_double(s1, &val, "txrx_buf_size");
  s->txrx_buf_size = (int) val;
  trx_get_param_double(s1, &val, "trace_rx");
  s->trace_rx = (int) val;
  trx_get_param_double(s1, &val, "trace_tx");
  s->trace_tx = (int) val;
  trx_get_param_double(s1, &val, "trace_offset");
  s->trace_offset = (int) val;
  trx_get_param_double(s1, &val, "monitor_pps");
  s->monitor_pps = (int) val;
  trx_get_param_double(s1, &val, "monitor_trigger_duration");
  s->monitor_trigger_duration = (int) val;
  trx_get_param_double(s1, &val, "start_receiving");
  s->start_receiving = (int) val;
  trx_get_param_double(s1, &val, "rx_n_channel");
  s->rx_n_channel = (int) val;
  trx_get_param_double(s1, &val, "tx_n_channel");
  s->tx_n_channel = (int) val;
  trx_get_param_double(s1, &val, "master");
  s->master = (int) val;
  trx_get_param_double(s1, &val, "one_way_measure");
  s->one_way_measure = (int) val;
  trx_get_param_double(s1, &val, "one_way_period");
  s->one_way_period = (int) val;
  trx_get_param_double(s1, &val, "tdd_frame_start");
  s->tdd_frame_start = (int) val;
  trx_get_param_double(s1, &val, "timestamp_frames");
  s->timestamp_frames = (int) val;
  trx_get_param_double(s1, &val, "master_timestamp_check_period");
  s->master_timestamp_check_period = (int) val;
  trx_get_param_double(s1, &val, "slave_timestamp_check_period");
  s->slave_timestamp_check_period = (int) val;
  trx_get_param_double(s1, &val, "trx_read_null");
  s->trx_read_null = (int) val;
  trx_get_param_double(s1, &val, "tdd_period");
  s->tdd_period = (int) val;
  trx_get_param_double(s1, &val, "encode_burst");
  s->encode_burst = (int) val;
  trx_get_param_double(s1, &val, "send_burst");
  s->send_burst = (int) val;
  trx_get_param_double(s1, &val, "statistics_refresh_rate_ns");
  s->statistics_refresh_rate_ns = (int) val;
  trx_get_param_double(s1, &val, "ecpri_period");
  s->ecpri_period = (int) val;
  trx_get_param_double(s1, &val, "disable_rx_copy");
  s->disable_rx_copy = (int) val;

  if(s->ecpri_period == 0)
    log_exit("TRX_ECPRI", "ecpri_period parameter can't be null\n");
  if(s->rx_n_channel == 0)
    log_exit("TRX_ECPRI", "rx_n_channel parameter can't be null\n");
  if(s->tx_n_channel == 0)
    log_exit("TRX_ECPRI", "tx_n_channel parameter can't be null\n");

  s->re_mac = (uint8_t *) trx_get_param_string(s1, "re_mac");
  s->rec_mac = (uint8_t *) trx_get_param_string(s1, "rec_mac");
  s->rec_if = (uint8_t *) trx_get_param_string(s1, "rec_if");
  s->dpdk_options = trx_get_param_string(s1, "dpdk_options");
  s->log_directory = (uint8_t *) trx_get_param_string(s1, "log_directory");

  recv_pps_threshold = (s->frame_frequency * 9 / 10);

  s1->opaque = s;
  s1->trx_end_func = trx_ecpri_end;
  s1->trx_write_func2 = trx_ecpri_write;
  s1->trx_read_func2 = trx_ecpri_read;
  s1->trx_start_func = trx_ecpri_start;
  s1->trx_get_sample_rate_func = trx_ecpri_get_sample_rate;

  //s1->trx_write_func = trx_write_func;
  //s1->trx_read_func = trx_read_func;
  //s1->trx_set_tx_gain_func = trx_set_tx_gain_func;
  //s1->trx_set_rx_gain_func = trx_set_rx_gain_func;
  //s1->trx_get_tx_samples_per_packet_func = trx_get_tx_samples_per_packet_func;
  //s1->trx_get_stats = trx_get_stats;
  //s1->trx_dump_info = trx_dump_info;
  //s1->trx_get_abs_tx_power_func = trx_get_abs_tx_power_func;
  //s1->trx_get_abs_rx_power_func = trx_get_abs_rx_power_func;
  //s1->trx_msg_recv_func = trx_msg_recv_func;
  //s1->trx_get_tx_gain_func = trx_get_tx_gain_func;
  //s1->trx_get_rx_gain_func = trx_get_rx_gain_func;
  //s1->trx_stop_func = trx_stop_func;
  //s1->trx_read_timestamp = trx_read_timestamp;
  //s1->trx_set_tx_streams = trx_set_tx_streams;
  //s1->trx_set_rx_streams = trx_set_rx_streams;
  //s1->trx_schedule_read = trx_schedule_read;
  //s1->trx_set_agc_func = trx_set_agc_func;
  //s1->trx_get_agc_func = trx_get_agc_func;
  //s1->trx_msg_send_func = trx_msg_send_func;

  return 0;
}