Commit 14b45dba authored by Joanne Hugé's avatar Joanne Hugé

Finish rewriting encode_thread, trx_ecpri_read and trx_ecpri_write

parent 41a95732
......@@ -50,6 +50,8 @@
#define ECPRI_IQ_HEADER (ECPRI_COMMON_HEADER + 4)
#define ORAN_HEADER 8
#define PACKET_HEADER (ETHERNET_HEADER + ECPRI_IQ_HEADER + ORAN_HEADER)
#define IQ_PAYLOAD 8832
#define PACKET_SIZE (PACKET_HEADER + IQ_PAYLOAD)
/* eCPRI frame structure
......@@ -92,6 +94,13 @@ IQ throughput (4 * 20 MHz): 3 932 160 000 bytes
1 subframe = 2 slots
1 slot = 7 symbols
One ethernet frame = 8862
14 4 4 8 8832
+-----------------+---------------------+----------------+-------------+------------+
| Ethernet header | eCPRI common header | IQ data header | oRAN header | IQ payload |
+-----------------+---------------------+----------------+-------------+------------+
*/
typedef struct {
......@@ -420,12 +429,6 @@ static void *send_thread(void *p) {
pthread_exit(EXIT_SUCCESS);
}
/* If sync has happenned (=we have received frames):
Prepare as soon as TRX has packets to write
Signal
Else:
Prepare as soon as there is space in tx buffer */
#define TX_SYNC_BURST_SIZE 512
static void *encode_thread(void *p) {
cpu_set_t mask;
......@@ -446,16 +449,14 @@ static void *encode_thread(void *p) {
uint16_t * 16_bit;
int j;
// If we have frames to encode (is there space in TX buffer)
// And if there are frames from trx_write callback to encode
to_write = rbuf_write_amount(&tx_rbuf);
to_read = rbuf_read_amount(&trxw_rbuf[0]);
// If we have frames to encode (is there space in TX buffer)
// If there are frames from trx_write callback to encode
if(to_write && to_read) {
for(uint16_t antenna_id = 0 ; antenna_id < 4; antenna_id++) {
for(uint16_t antenna_id = 0 ; antenna_id < s->tx_n_channel; antenna_id++) {
data = rbuf_write(&tx_rbuf);
memcpy(data, packet_header, PACKET_HEADER);
j = ETHERNET_HEADER + ECPRI_COMMON_HEADER;
// PC_ID
......@@ -472,32 +473,29 @@ static void *encode_thread(void *p) {
// 8832 bytes of IQ samples
// TODO
int nc;
int nf = 8832;
while((nc = rbuf_contiguous_copy(&trxw_rbuf[antenna_id], &tx_rbuf, nf))) {
memcpy(data[PACKET_HEADER] + (8832 - nf),
rbuf_read(&trxw_rbuf[antenna_id], 8832 - nf),
nc);
rbuf_increment_read(&trxw_rbuf[antenna_id], nc);
tx_rbuf.write_index = (tx_rbuf.write_index + nc) % tx_rbuf.buf_len;
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + nc) % trxw_rbuf[0].buf_len;
nf -= nc;
// Add IQ_PAYLOAD to the current packet
int write_count;
int count_left = IQ_PAYLOAD;
while((write_count = rbuf_contiguous_copy(&trxw_rbuf[antenna_id], &tx_rbuf,
count_left))) {
memcpy(data[PACKET_HEADER] + (IQ_PAYLOAD - count_left),
rbuf_read(&trxw_rbuf[antenna_id], IQ_PAYLOAD - count_left),
write_count);
rbuf_increment_read(&trxw_rbuf[antenna_id], write_count);
tx_rbuf.write_index = (tx_rbuf.write_index + write_count) % tx_rbuf.buf_len;
trxw_rbuf[0].read_index = (trxw_rbuf[0].read_index + write_count) % \
trxw_rbuf[0].buf_len;
count_left -= write_count;
}
if(nf)
if(count_left)
exit(EXIT_FAILURE);
}
update_counter(&encode_counter, nb_frames);
if(!g->count) {
rbuf_update_read_index(&trxw_group_rbuf);
}
}
// Increment counter once we wrote packets for all channels
update_counter(&encode_counter, 1);
// Update ORAN counters
slot_id = (slot_id + 1) % 2;
if(!slot_id) {
sub_frame_id = (sub_frame_id + 1) % 10;
......@@ -505,11 +503,6 @@ static void *encode_thread(void *p) {
frame_id = (frame_id + 1) % 100;
}
}
// TODO
int nc;
int nf = nb_frames;
}
}
pthread_exit(EXIT_SUCCESS);
......@@ -761,18 +754,18 @@ int start(TRXEcpriState * s) {
init_counter(&encode_counter);
init_counter(&sent_counter);
init_rbuf(&rx_rbuf, "RX ring buffer", s->txrx_buf_size, MIN_PACKET_SIZE);
init_rbuf(&tx_rbuf, "TX ring buffer", s->txrx_buf_size, MIN_PACKET_SIZE);
init_rbuf(&rx_rbuf, "RX ring buffer", s->txrx_buf_size, PACKET_SIZE);
init_rbuf(&tx_rbuf, "TX ring buffer", s->txrx_buf_size, PACKET_SIZE);
for(int i = 0; i < s->tx_n_channel; i++) {
char name[256];
sprintf(name, "TRXWrite Ring Buffer %d", i);
init_rbuf(trxw_rbuf[i], name, s->trx_buf_size, Complex);
init_rbuf(trxw_rbuf[i], name, s->trx_buf_size, sizeof(Complex) * 32);
}
for(int i = 0; i < s->rx_n_channel; i++) {
char name[256];
sprintf(name, "TRXRead Ring Buffer %d", i);
init_rbuf(trxr_rbuf[i], name, s->trx_buf_size, Complex);
init_rbuf(trxr_rbuf[i], name, s->trx_buf_size, sizeof(Complex) * 32);
}
memset((uint8_t *) packet_header, 0, PACKET_HEADER);
......@@ -839,6 +832,9 @@ int start(TRXEcpriState * s) {
ecpri_iq_header,
ECPRI_IQ_HEADER + ORAN_HEADER);
for(int i = 0; i < tx_rbuf.buf_len; i+= PACKET_SIZE)
memcpy(((uint8_t *) tx_rbuf.buffer) + i, packet_header, PACKET_HEADER);
start_threads(s);
return 0;
}
......@@ -855,11 +851,11 @@ static int64_t prev_count = 0;
/*
Callback for processing TRX samples from Amarisoft
Writes to TRX write buffer
Write count samples from __samples into TRX write buffer, exit as soon as possible
- count: how much IQ samples to write (multiple of 32)
- timestamp: should be equal to the amount of written IQ samples
- __samples: can be null if zeroes are to be written
- __samples: can be null if zeroes are to be written (TDD only)
*/
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{
......@@ -878,7 +874,7 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
prev_timestamp = timestamp; prev_count = count;
// Drop samples if we don't have enough space in trx write buffer
if(count > (rbuf_write_amount(&trxw_rbuf[0]) / sizeof(Complex)) ) {
if( (count * sizeof(Complex)) > rbuf_write_amount(&trxw_rbuf[0]) ) {
//log_exit("TRX_ECPRI_WRITE",
// "Not enough space to write in trxw_rbuf (count = %d)", count);
update_counter(&tx_drop_counter, count / N_SAMPLES);
......@@ -887,8 +883,7 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
offset = 0;
count_left = count;
while((nc = rbuf_contiguous_copy(
NULL, &trxw_rbuf[0], count_left * sizeof(Complex)))) {
while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[0], count_left * sizeof(Complex)))) {
for(int i = 0; i < s->tx_n_channel; i++) {
if(__samples)
memcpy(
......@@ -910,7 +905,7 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
/*
Callback for sending TRX samples to Amarisoft
Reads from TRX read buffer
Put count IQ samples from TRX read buffer in __samples, return as soon as possible
- count: how much IQ samples to read (multiple of 32)
- ptimestamp: number of total read IQ samples
......@@ -926,14 +921,14 @@ static int trx_ecpri_read(TRXState *s1, trx_timestamp_t *ptimestamp, void **__sa
log_debug("TRX_ECPRI_READ", "count = %ld (%li)",
count / N_SAMPLES, read_counter.counter);
while((rbuf_read_amount(&trxr_rbuf[0]) / sizeof(Complex)) < count);
// Wait to have enough sampels in TRX read buffer
while( rbuf_read_amount(&trxr_rbuf[0]) < (count * sizeof(Complex)) );
sync_complete = 1;
offset = 0;
count_left = count;
while((nc = rbuf_contiguous_copy(
&trxr_rbuf[0], NULL, count_left * sizeof(Complex)))) {
while((nc = rbuf_contiguous_copy(&trxr_rbuf[0], NULL, count_left * sizeof(Complex)))) {
for(int i = 0; i < s->rx_n_channel; i++ ) {
memcpy(
((uint8_t*) _samples[i]) + offset,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment