Commit bcc89588 authored by Joanne Hugé's avatar Joanne Hugé

WIP: use structs for packets and headers

parent 815167eb
......@@ -147,6 +147,43 @@ typedef struct {
int64_t pps;
} counter_stat_t;
typedef struct {
uint8_t mac_dst[6];
uint8_t mac_src[6];
uint16_t ethertype;
uint8_t ecpri_version;
uint8_t ecpri_type;
uint16_t payload_size;
uint16_t antenna_id;
uint16_t seq_id;
} ecpri_header;
typedef struct {
uint8_t mac_dst[6];
uint8_t mac_src[6];
uint16_t ethertype;
uint8_t ecpri_version;
uint8_t ecpri_type;
uint16_t payload_size;
uint16_t antenna_id;
uint16_t seq_id;
uint8_t oran_header[ORAN_HEADER];
uint8_t iq_samples[IQ_PAYLOAD];
} ecpri_iq_packet;
typedef struct {
uint8_t mac_dst[6];
uint8_t mac_src[6];
uint16_t ethertype;
uint8_t ecpri_version;
uint8_t ecpri_type;
uint16_t payload_size;
uint16_t antenna_id;
uint16_t seq_id;
uint8_t oran_header[ORAN_HEADER];
uint8_t gps_time[10];
} ecpri_timing_packet;
// Buffers
static ring_buffer_t trxr_rbuf[MAX_CHANNELS]; // Decoded IQ samples
static ring_buffer_t trxw_rbuf[MAX_CHANNELS]; // Uncompressed IQ samples
......@@ -162,8 +199,7 @@ static counter_stat_t rx_drop_counter; // frames sent to RRH
static counter_stat_t tx_drop_counter; // frames sent to RRH
// Network
static int prev_seq_id = -1;
static int seq_id;
static int tx_seq_id;
static uint8_t frame_id;
static uint8_t sub_frame_id;
static uint8_t slot_id;
......@@ -348,7 +384,7 @@ void print_packet(uint8_t * data, int length) {
int offset = 0;
int j;
uint16_t antenna_id;
int _seq_id;
int seq_id;
print_bytes("MAC DST: ", data, &offset, 6);
print_bytes("MAC SRC: ", data, &offset, 6);
print_bytes("Ethertype: ", data, &offset, 2);
......@@ -357,7 +393,7 @@ void print_packet(uint8_t * data, int length) {
//print_bytes("IQ payload:\n", data, &offset, length - offset);
j = ETHERNET_HEADER + ECPRI_COMMON_HEADER;
antenna_id = ntohs(*((uint16_t *) (data + j + 0)));
_seq_id = data[j + 2];
seq_id = data[j + 2];
printf("Antenna ID: %d\n", antenna_id);
printf("Seq ID: %d\n", _seq_id);
}
......@@ -370,9 +406,13 @@ static void *recv_thread(void *p) {
cpu_set_t mask;
int stop = 0;
int prev_seq_id = -1;
int offset;
struct mmsghdr msgh[MAX_RX_BURST];
struct iovec msgv[MAX_RX_BURST];
uint8_t * data;
ecpri_header * header;
ecpri_iq_packet * iq_packet;
ecpri_timing_packet * timing_packet;
TRXEcpriState * s = (TRXEcpriState *) p;
log_info("RECV_THREAD", "Thread init");
......@@ -385,9 +425,9 @@ static void *recv_thread(void *p) {
for(int64_t i = 0;; i++) {
// Reset data structures for recv messages
memset(msgv, 0, sizeof(msgv));
memset(msgh, 0, sizeof(msgh));
for(int j = 0; j < s->rx_burst; j++) {
msgv[j].iov_base = rx_buf + j * PACKET_SIZE;
msgv[j].iov_len = PACKET_SIZE;
......@@ -395,43 +435,66 @@ static void *recv_thread(void *p) {
msgh[j].msg_hdr.msg_iovlen = 1;
}
// Receive at most rx_burst messages
int ret = recvmmsg(recv_sockfd, msgh, s->rx_burst, 0, NULL);
if(ret <= -1)
error(EXIT_FAILURE, errno, "recvmmsg error");
// Process each received message
for(int j = 0; j < ret; j++) {
data = rx_buf + j * PACKET_SIZE;
header = (ecpri_header*) (rx_buf + j * PACKET_SIZE);
if( ((uint16_t *) data)[6] != 0xfeae) {
stop = 1;
log_info("DECODE_THREAD", "%d != %d\n", ((uint16_t *) data)[6], 0xaefe);
break;
}
else if((msgh + j)->msg_len != PACKET_SIZE) {
// Discard packet if it is not eCPRI
if(header->ethertype != 0xfeae)
continue;
// Stop if packet has unexpected size
if((msgh + j)->msg_len != PACKET_SIZE) {
stop = 1;
log_info("DECODE_THREAD", "Packet doesn't have correct size (%d != %d)", (msgh + j)->msg_len, PACKET_SIZE);
break;
}
int offset = ETHERNET_HEADER + ECPRI_COMMON_HEADER;
uint16_t antenna_id = ntohs(*((uint16_t *) (data + offset + 0)));
int _seq_id = data[offset + 2];
if ( prev_seq_id != -1 && (_seq_id + 256 - prev_seq_id) % 256 != 1 ) {
stop = 1;
log_info("DECODE_THREAD", "seq_ids are not sequential (%d, %d)", prev_seq_id, _seq_id);
break;
// If packet is a timing packet
if(header->ecpri_type == 255) {
timing_packet = (ecpri_timing_packet*) (rx_buf + j * PACKET_SIZE);
log_info("DECODE_THREAD", "GPS TIME: %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
timing_packet->gps_time[0],
timing_packet->gps_time[1],
timing_packet->gps_time[2],
timing_packet->gps_time[3],
timing_packet->gps_time[4],
timing_packet->gps_time[5],
timing_packet->gps_time[6],
timing_packet->gps_time[7],
timing_packet->gps_time[8],
timing_packet->gps_time[9]);
continue;
}
if(antenna_id > s->rx_n_channel || antenna_id < 0) {
stop = 1;
log_info("DECODE_THREAD", "Wrong Antenna ID: %d\n", antenna_id);
break;
// Exit if packet is neither timing nor IQ
else if(header->ecpri_type != 0) {
log_info("DECODE_THREAD", "Unknown eCPRI type: %d\n", header->ecpri_type);
stop = 1; break;
}
// Exit if SEQ ID is not sequential
if ( prev_seq_id != -1 && (header->seq_id + 256 - prev_seq_id) % 256 != 1 ) {
log_info("DECODE_THREAD", "seq_ids are not sequential (%d, %d)",
prev_seq_id, header->seq_id);
stop = 1; break;
}
prev_seq_id = _seq_id;
if(!rbuf_write_amount(&trxr_rbuf[antenna_id]))
log_exit("DECODE_THREAD", "No more space in %s buffer", trxr_rbuf[antenna_id].name);
memcpy(trxr_rbuf[antenna_id].buffer,
data + PACKET_HEADER,
IQ_PAYLOAD);
rbuf_increment_write(&trxr_rbuf[antenna_id], IQ_PAYLOAD);
// Exit if antenna ID is not in range
if(header->antenna_id > s->rx_n_channel || header->antenna_id < 0) {
log_info("DECODE_THREAD", "Wrong Antenna ID: %d\n", header->antenna_id);
stop = 1; break;
}
// Exit if there is no more space in the buffer
if(!rbuf_write_amount(&trxr_rbuf[header->antenna_id]))
log_exit("DECODE_THREAD", "No more space in %s buffer",
trxr_rbuf[header->antenna_id].name);
prev_seq_id = header->seq_id;
iq_packet = (ecpri_iq_packet*) (rx_buf + j * PACKET_SIZE);
memcpy(trxr_rbuf[header->antenna_id].buffer,
iq_packet->iq_samples,
IQ_PAYLOAD);
rbuf_increment_write(&trxr_rbuf[header->antenna_id], IQ_PAYLOAD);
update_counter(&recv_counter, 1);
}
if(stop) {
......@@ -480,6 +543,7 @@ static void *send_thread(void *p) {
log_info("SEND_THREAD", "Starting loop");
for(int64_t i = 1;; i++) {
// Send at most tx_burst packets
int to_send = s->tx_burst / s->tx_n_channel;
for(int k = 0; k < s->tx_n_channel; k++) {
int to_read = rbuf_read_amount(&trxw_rbuf[k]) / IQ_PAYLOAD;
......@@ -487,6 +551,7 @@ static void *send_thread(void *p) {
to_send = to_read;
}
to_send *= s->tx_n_channel;
for(int encoded = 0; encoded < to_send;) {
for(uint16_t antenna_id = 0 ; antenna_id < s->tx_n_channel; antenna_id++) {
data = tx_buf + encoded * PACKET_SIZE;
......@@ -495,8 +560,8 @@ static void *send_thread(void *p) {
// PC_ID
*((uint16_t *) (data + j + 0)) = ntohs(antenna_id);
// SEQ_ID
data[j + 2] = (uint8_t) seq_id;
seq_id = (seq_id + 1) % 256;
data[j + 2] = (uint8_t) tx_seq_id;
tx_seq_id = (tx_seq_id + 1) % 256;
data[j + 3] = 0x80;
j = ETHERNET_HEADER + ECPRI_IQ_HEADER;
// ORAN counters
......@@ -655,7 +720,7 @@ int start(TRXEcpriState * s) {
//set_latency_target();
seq_id = 0;
tx_seq_id = 0;
init_counter(&rx_drop_counter);
init_counter(&tx_drop_counter);
init_counter(&recv_counter);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment