Commit c9e7e866 authored by Joanne Hugé's avatar Joanne Hugé

Master/Slave

parent f49805d2
...@@ -104,6 +104,9 @@ typedef struct { ...@@ -104,6 +104,9 @@ typedef struct {
int one_way_measure; int one_way_measure;
int one_way_period; int one_way_period;
int tdd_frame_start; int tdd_frame_start;
int timestamp_frames;
int master_timestamp_check_period;
int slave_timestamp_check_period;
} TRXEcpriState; } TRXEcpriState;
typedef struct { typedef struct {
...@@ -145,8 +148,6 @@ static ring_buffer_t trxw_group_rbuf; // Group of IQ samples ...@@ -145,8 +148,6 @@ static ring_buffer_t trxw_group_rbuf; // Group of IQ samples
static ring_buffer_t one_way_rbuf; // One way delay measurements static ring_buffer_t one_way_rbuf; // One way delay measurements
static volatile uint64_t one_way_timestamp;
// Counters // Counters
static volatile counter_stat_t recv_counter; // frames received from eRE static volatile counter_stat_t recv_counter; // frames received from eRE
static volatile counter_stat_t decode_counter; // decoded frames static volatile counter_stat_t decode_counter; // decoded frames
...@@ -181,6 +182,12 @@ static volatile int min_count_trx = 1000000; ...@@ -181,6 +182,12 @@ static volatile int min_count_trx = 1000000;
// Network // Network
static volatile int tx_seq_id; static volatile int tx_seq_id;
static volatile int rx_seq_id; static volatile int rx_seq_id;
static volatile uint64_t one_way_timestamp;
static volatile int64_t rxtx_shift;
static volatile int64_t slave_ts_offset;
static int64_t tx_initial_ts;
static int64_t sync_encode_counter;
static void rbuf_update_write_index(ring_buffer_t * rbuf) { static void rbuf_update_write_index(ring_buffer_t * rbuf) {
rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len; rbuf->write_index = (rbuf->write_index + 1) % rbuf->buf_len;
...@@ -247,6 +254,8 @@ static void print_stats(FILE * f, int print_header) { ...@@ -247,6 +254,8 @@ static void print_stats(FILE * f, int print_header) {
"%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"\n", "\n",
"RX", "RX",
"TX", "TX",
...@@ -269,7 +278,9 @@ static void print_stats(FILE * f, int print_header) { ...@@ -269,7 +278,9 @@ static void print_stats(FILE * f, int print_header) {
"RX", "RX",
"TRXR", "TRXR",
"TRXW", "TRXW",
"TX"); "TX",
"SLAVE",
"MASTER");
fprintf(f, fprintf(f,
"%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
...@@ -293,6 +304,8 @@ static void print_stats(FILE * f, int print_header) { ...@@ -293,6 +304,8 @@ static void print_stats(FILE * f, int print_header) {
"%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s " "%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"%-" STAT_INT_LEN "s "
"\n", "\n",
"DROPPED", "DROPPED",
"DROPPED", "DROPPED",
...@@ -315,7 +328,9 @@ static void print_stats(FILE * f, int print_header) { ...@@ -315,7 +328,9 @@ static void print_stats(FILE * f, int print_header) {
"DELAY", "DELAY",
"DELAY", "DELAY",
"DELAY", "DELAY",
"DELAY"); "DELAY",
"",
"");
} }
fprintf(f, fprintf(f,
"%-" STAT_INT_LEN "" PRIi64 " " "%-" STAT_INT_LEN "" PRIi64 " "
...@@ -340,6 +355,8 @@ static void print_stats(FILE * f, int print_header) { ...@@ -340,6 +355,8 @@ static void print_stats(FILE * f, int print_header) {
"%-" STAT_INT_LEN "d " "%-" STAT_INT_LEN "d "
"%-" STAT_INT_LEN "d " "%-" STAT_INT_LEN "d "
"%-" STAT_INT_LEN "d " "%-" STAT_INT_LEN "d "
"%-" STAT_INT_LEN "" PRIi64 " "
"%-" STAT_INT_LEN "" PRIi64 " "
"\n", "\n",
rx_drop_counter.counter, rx_drop_counter.counter,
tx_drop_counter.counter, tx_drop_counter.counter,
...@@ -362,7 +379,9 @@ static void print_stats(FILE * f, int print_header) { ...@@ -362,7 +379,9 @@ static void print_stats(FILE * f, int print_header) {
rbuf_read_amount(&rx_rbuf), rbuf_read_amount(&rx_rbuf),
rbuf_read_amount(&trxr_rbuf[0]), rbuf_read_amount(&trxr_rbuf[0]),
rbuf_read_amount(&trxw_rbuf[0]), rbuf_read_amount(&trxw_rbuf[0]),
rbuf_read_amount(&tx_rbuf)); rbuf_read_amount(&tx_rbuf),
rxtx_shift,
slave_ts_offset);
} }
static void log_exit(const char * section, const char * msg, ...) { static void log_exit(const char * section, const char * msg, ...) {
...@@ -474,6 +493,15 @@ static void update_counter(volatile counter_stat_t * c, int64_t v) { ...@@ -474,6 +493,15 @@ static void update_counter(volatile counter_stat_t * c, int64_t v) {
c->counter += v; c->counter += v;
} }
#define GETTIME_MAX_C INT64_C(7000000000)
static int64_t gettime(int64_t initial_ts, int64_t counter, int64_t freq) {
int64_t ret = initial_ts;
for(; counter >= GETTIME_MAX_C; counter -= GETTIME_MAX_C)
ret += GETTIME_MAX_C * NSEC_PER_SEC / freq;
ret += (counter * NSEC_PER_SEC / freq);
return ret;
}
static void trace_handler(struct timespec initial, TRXEcpriState * s) { static void trace_handler(struct timespec initial, TRXEcpriState * s) {
struct timespec next; struct timespec next;
int ready = 1; int ready = 1;
...@@ -727,12 +755,21 @@ static void encode_empty_frames(int n, int trx, TRXEcpriState * s) { ...@@ -727,12 +755,21 @@ static void encode_empty_frames(int n, int trx, TRXEcpriState * s) {
uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8; uint8_t * buf = RBUF_WRITE0(tx_rbuf, uint8_t) + 8;
int n2 = n; int n2 = n;
for(int i = 0; i < n; i++) { for(int i = 0; i < n; i++) {
// ONE WAY DELAY MEASURE // ONE WAY DELAY MEASURE
if(s->one_way_measure && (encode_counter.counter + i) % s->one_way_period) if(s->one_way_measure && (encode_counter.counter + i) % s->one_way_period)
n2 += one_way_delay_measure(&buf); n2 += one_way_delay_measure(&buf);
// TDD FRAME START // TDD FRAME START
if(s->tdd_frame_start) if(s->tdd_frame_start)
*((uint8_t *)(buf + 60 * s->tx_n_channel)) = (trx && !((trx_encode_counter.counter + i) % s->tdd_period)); *((uint8_t *)(buf + 60 * s->tx_n_channel)) = (trx && !((trx_encode_counter.counter + i) % s->tdd_period));
// TIMESTAMP
if(s->timestamp_frames) {
if(s->master)
*((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(0, sync_encode_counter++ + rxtx_shift, s->frame_frequency);
else
*((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(tx_initial_ts, sync_encode_counter++, s->frame_frequency);
}
memset(buf, 0x00, 60 * s->tx_n_channel); memset(buf, 0x00, 60 * s->tx_n_channel);
*((uint16_t *)(buf - 2)) = htons(tx_seq_id++); *((uint16_t *)(buf - 2)) = htons(tx_seq_id++);
buf += tx_rbuf.len; buf += tx_rbuf.len;
...@@ -741,7 +778,7 @@ static void encode_empty_frames(int n, int trx, TRXEcpriState * s) { ...@@ -741,7 +778,7 @@ static void encode_empty_frames(int n, int trx, TRXEcpriState * s) {
if(trx) if(trx)
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + n) % trxw_rbuf[0].buf_len; trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + n) % trxw_rbuf[0].buf_len;
} }
static void encode_trx_frames(int n, TRXEcpriState * s) { static void encode_iq_samples(int n, TRXEcpriState * s) {
int nc; int nc;
int nf = n; int nf = n;
while((nc = rbuf_contiguous_copy(&trxw_rbuf[0], &tx_rbuf, nf))) { while((nc = rbuf_contiguous_copy(&trxw_rbuf[0], &tx_rbuf, nf))) {
...@@ -752,12 +789,21 @@ static void encode_trx_frames(int n, TRXEcpriState * s) { ...@@ -752,12 +789,21 @@ static void encode_trx_frames(int n, TRXEcpriState * s) {
iq_samples[j] = ((Complex *) trxw_rbuf[j].buffer) + (trxw_rbuf[0].read_index * trxw_rbuf[0].len); iq_samples[j] = ((Complex *) trxw_rbuf[j].buffer) + (trxw_rbuf[0].read_index * trxw_rbuf[0].len);
for(int i = 0; i < nc; i++) { for(int i = 0; i < nc; i++) {
// ONE WAY DELAY MEASURE // ONE WAY DELAY MEASURE
if(s->one_way_measure && (encode_counter.counter + i) % s->one_way_period) if(s->one_way_measure && (encode_counter.counter + i) % s->one_way_period)
nc2 += one_way_delay_measure(&buf); nc2 += one_way_delay_measure(&buf);
// TDD FRAME START // TDD FRAME START
if(s->tdd_frame_start) if(s->tdd_frame_start)
*((uint8_t *)(buf + 60 * s->tx_n_channel)) = (trx && !((trx_encode_counter.counter + i) % s->tdd_period)); *((uint8_t *)(buf + 60 * s->tx_n_channel)) = !((trx_encode_counter.counter + i) % s->tdd_period);
// TIMESTAMP
if(s->timestamp_frames) {
if(s->master)
*((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(0, sync_encode_counter++ + rxtx_shift, s->frame_frequency);
else
*((int64_t *)(buf + 60 * s->tx_n_channel)) = gettime(tx_initial_ts, sync_encode_counter++, s->frame_frequency);
}
for(int i = 0; i < s->tx_n_channel ; i++) for(int i = 0; i < s->tx_n_channel ; i++)
encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]); encode_s64_b60_2(buf + i * 60, (float *) iq_samples[i]);
*((uint16_t *)(buf - 2)) = htons(tx_seq_id++); *((uint16_t *)(buf - 2)) = htons(tx_seq_id++);
...@@ -773,7 +819,7 @@ static void encode_trx_frames(int n, TRXEcpriState * s) { ...@@ -773,7 +819,7 @@ static void encode_trx_frames(int n, TRXEcpriState * s) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
int read_trx(int n_max, TRXEcpriState * s) { int encode_trx(int n_max, TRXEcpriState * s) {
int remain = n_max; int remain = n_max;
while(remain && rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) { while(remain && rbuf_read_amount(&trxw_rbuf[0]) && rbuf_read_amount(&trxw_group_rbuf)) {
...@@ -802,7 +848,7 @@ int read_trx(int n_max, TRXEcpriState * s) { ...@@ -802,7 +848,7 @@ int read_trx(int n_max, TRXEcpriState * s) {
if(g->zeroes) if(g->zeroes)
encode_empty_frames(n_trx, 1, s); encode_empty_frames(n_trx, 1, s);
else else
encode_trx_frames(n_trx, s); encode_iq_samples(n_trx, s);
if(!g->count) { if(!g->count) {
rbuf_update_read_index(&trxw_group_rbuf); rbuf_update_read_index(&trxw_group_rbuf);
...@@ -828,7 +874,6 @@ static void *encode_thread(void *p) { ...@@ -828,7 +874,6 @@ static void *encode_thread(void *p) {
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
int64_t target_counter = 0; int64_t target_counter = 0;
struct timespec next; struct timespec next;
int64_t initial_ts = 0;
// Set thread CPU affinity // Set thread CPU affinity
CPU_ZERO(&mask); CPU_ZERO(&mask);
...@@ -836,6 +881,9 @@ static void *encode_thread(void *p) { ...@@ -836,6 +881,9 @@ static void *encode_thread(void *p) {
if (sched_setaffinity(0, sizeof(mask), &mask)) if (sched_setaffinity(0, sizeof(mask), &mask))
error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity); error(EXIT_FAILURE, errno, "Could not set CPU affinity to CPU %d\n", s->encode_affinity);
// Slave mode: first TS uses clock_gettime, then add fixed amount to timestamp
// Master mode: Computes TX RX offset using received TS to set the TS
// At each beginning of TDD frame (or less often), send empty frames to align with RX
for(int64_t i = 0;; i++) { for(int64_t i = 0;; i++) {
int n, n_trx; int n, n_trx;
int n_empty = 0; int n_empty = 0;
...@@ -843,7 +891,7 @@ static void *encode_thread(void *p) { ...@@ -843,7 +891,7 @@ static void *encode_thread(void *p) {
if(s->master && !i) { if(s->master && !i) {
clock_gettime(CLOCK_TAI, &next); clock_gettime(CLOCK_TAI, &next);
initial_ts = ts_to_int(next); tx_initial_ts = ts_to_int(next);
add_ns(&next, (((int64_t) MASTER_BURST) * NSEC_PER_SEC) / s->frame_frequency); add_ns(&next, (((int64_t) MASTER_BURST) * NSEC_PER_SEC) / s->frame_frequency);
} }
...@@ -852,17 +900,16 @@ static void *encode_thread(void *p) { ...@@ -852,17 +900,16 @@ static void *encode_thread(void *p) {
n_min += s->one_way_measure ? (s->encode_burst / s->one_way_period) + 1 : 0; n_min += s->one_way_measure ? (s->encode_burst / s->one_way_period) + 1 : 0;
if(s->master && n < n_min) if(s->master && n < n_min)
log_exit("ENCODE_THREAD", "Not enough space in TX RBUF (%d < %d)\n", n, s->encode_burst); log_exit("ENCODE_THREAD", "Not enough space in TX RBUF (%d < %d)\n", n, s->encode_burst);
n_max = s->encode_burst ? s->encode_burst : n; n_max = s->encode_burst ? s->encode_burst : n;
n_trx = read_trx(n_max, s);
n_trx = encode_trx(n_max, s);
if(s->master) { if(s->master) {
struct timespec current; struct timespec current;
n_empty = s->encode_burst - n_trx; n_empty = s->encode_burst - n_trx;
encode_empty_frames(n_empty, 0, s); encode_empty_frames(n_empty, 0, s);
target_counter += s->encode_burst; target_counter += s->encode_burst;
next = int_to_ts(initial_ts + target_counter * NSEC_PER_SEC / s->frame_frequency); next = int_to_ts(gettime(tx_initial_ts, target_counter, s->frame_frequency));
do { do {
clock_gettime(CLOCK_TAI, &current); clock_gettime(CLOCK_TAI, &current);
} while(current.tv_sec < next.tv_sec || (current.tv_sec == next.tv_sec && current.tv_nsec < next.tv_nsec)); } while(current.tv_sec < next.tv_sec || (current.tv_sec == next.tv_sec && current.tv_nsec < next.tv_nsec));
...@@ -879,6 +926,7 @@ static void *decode_thread(void *p) { ...@@ -879,6 +926,7 @@ static void *decode_thread(void *p) {
TRXEcpriState * s = (TRXEcpriState *) p; TRXEcpriState * s = (TRXEcpriState *) p;
struct timespec next; struct timespec next;
int64_t target_counter = 0; int64_t target_counter = 0;
int64_t sync_decode_counter = 0;
int reset_decode_counter = 1; int reset_decode_counter = 1;
log_info("DECODE_THREAD", "Thread init"); log_info("DECODE_THREAD", "Thread init");
...@@ -946,10 +994,22 @@ static void *decode_thread(void *p) { ...@@ -946,10 +994,22 @@ static void *decode_thread(void *p) {
rbuf_update_write_index(&one_way_rbuf); rbuf_update_write_index(&one_way_rbuf);
continue; continue;
} }
// TIMESTAMP
if(s->master && s->timestamp_frames && !(sync_decode_counter % s->master_timestamp_check_period)) {
int64_t timestamp = (int64_t) *(buf + s->rx_n_channel * 60);
rxtx_shift = gettime(0, timestamp, s->frame_frequency) - sync_decode_counter;
}
if(!s->master && s->timestamp_frames && !(sync_decode_counter % s->slave_timestamp_check_period)) {
struct timespec t;
int64_t timestamp = (int64_t) *(buf + s->rx_n_channel * 60);
clock_gettime(CLOCK_TAI, &t);
slave_ts_offset = ts_to_int(t) - timestamp;
}
for(int j = 0; j < s->rx_n_channel ; j++) { for(int j = 0; j < s->rx_n_channel ; j++) {
decode_s64_b60_2((float *) (iq_samples[j] + i * 32), buf + j * 60 + i * rx_rbuf.len); decode_s64_b60_2((float *) (iq_samples[j] + i * 32), buf + j * 60 + i * rx_rbuf.len);
} }
sync_decode_counter++;
} }
trxr_rbuf[0].write_index = (trxr_rbuf[0].write_index + nc) % trxr_rbuf[0].buf_len; trxr_rbuf[0].write_index = (trxr_rbuf[0].write_index + nc) % trxr_rbuf[0].buf_len;
...@@ -1197,8 +1257,8 @@ int startdpdk(TRXEcpriState * s) { ...@@ -1197,8 +1257,8 @@ int startdpdk(TRXEcpriState * s) {
init_counter(&sent_counter); init_counter(&sent_counter);
init_counter(&empty_encode_counter); init_counter(&empty_encode_counter);
RBUF_INIT(rx_rbuf, "RX ring buffer", s->txrx_buf_size, RX_MAX_PACKET_SIZE + s->one_way_measure, uint8_t); RBUF_INIT(rx_rbuf, "RX ring buffer", s->txrx_buf_size, RX_MAX_PACKET_SIZE + s->tdd_frame_start + s->timestamp_frames * sizeof(uint64_t), uint8_t);
RBUF_INIT(tx_rbuf, "TX ring buffer", s->txrx_buf_size, TX_ECPRI_PACKET_SIZE + s->one_way_measure, uint8_t); RBUF_INIT(tx_rbuf, "TX ring buffer", s->txrx_buf_size, TX_ECPRI_PACKET_SIZE + s->tdd_frame_start + s->timestamp_frames * sizeof(uint64_t), uint8_t);
for(int i = 0; i < s->tx_n_channel; i++) { for(int i = 0; i < s->tx_n_channel; i++) {
char name[256]; char name[256];
sprintf(name, "TRXWrite Ring Buffer %d", i); sprintf(name, "TRXWrite Ring Buffer %d", i);
...@@ -1624,6 +1684,12 @@ int trx_driver_init(TRXState *s1) ...@@ -1624,6 +1684,12 @@ int trx_driver_init(TRXState *s1)
s->one_way_period = (int) val; s->one_way_period = (int) val;
trx_get_param_double(s1, &val, "tdd_frame_start"); trx_get_param_double(s1, &val, "tdd_frame_start");
s->tdd_frame_start = (int) val; s->tdd_frame_start = (int) val;
trx_get_param_double(s1, &val, "timestamp_frames");
s->timestamp_frames = (int) val;
trx_get_param_double(s1, &val, "master_timestamp_check_period");
s->master_timestamp_check_period = (int) val;
trx_get_param_double(s1, &val, "slave_timestamp_check_period");
s->slave_timestamp_check_period = (int) val;
trx_get_param_double(s1, &val, "trx_read_null"); trx_get_param_double(s1, &val, "trx_read_null");
s->trx_read_null = (int) val; s->trx_read_null = (int) val;
trx_get_param_double(s1, &val, "tdd_period"); trx_get_param_double(s1, &val, "tdd_period");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment