Commit 454bb157 authored by Joanne Hugé's avatar Joanne Hugé

wip

parent 93e7794b
...@@ -86,6 +86,19 @@ static volatile char * rbuf_read(ring_buffer_t * rbuf, size_t * size) { ...@@ -86,6 +86,19 @@ static volatile char * rbuf_read(ring_buffer_t * rbuf, size_t * size) {
return data; return data;
} }
/* Returns maximum contiguous size on which rbuf_src can be copied to rbuf_dst */
static size_t rbuf_contiguous_copy(ring_buffer_t * rbuf_src,
ring_buffer_t * rbuf_dst, size_t n) {
size_t ret = n;
if(rbuf_src) {
n = rbuf_src->len - rbuf_src->read_index;
ret = n < ret ? n : ret;
}
if(rbuf_dst)
n = rbuf_dst->len - rbuf_dst->write_index;
return n < ret ? n : ret;
}
#ifdef TEST #ifdef TEST
static ring_buffer_t test_rbuf; static ring_buffer_t test_rbuf;
...@@ -138,16 +151,6 @@ static int rbuf_write_amount(ring_buffer_t * rbuf) { ...@@ -138,16 +151,6 @@ static int rbuf_write_amount(ring_buffer_t * rbuf) {
// That we way we don't have to use locks // That we way we don't have to use locks
return ((rbuf->read_index + rbuf->buf_len - rbuf->write_index - 1) % rbuf->buf_len); return ((rbuf->read_index + rbuf->buf_len - rbuf->write_index - 1) % rbuf->buf_len);
} }
static int rbuf_contiguous_copy(ring_buffer_t * rbuf1, ring_buffer_t * rbuf2, int n) {
int ret = n;
if(rbuf1) {
n = rbuf1->buf_len - rbuf1->read_index;
ret = n < ret ? n : ret;
}
if(rbuf2)
n = rbuf2->buf_len - rbuf2->write_index;
return n < ret ? n : ret;
}
#define RBUF_READ0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len)) #define RBUF_READ0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len))
#define RBUF_WRITE0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len)) #define RBUF_WRITE0(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len))
#define RBUF_READ(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.read_index + i) % rbuf.buf_len) * rbuf.len)) #define RBUF_READ(rbuf, i, type) (((type *) rbuf.buffer) + (((rbuf.read_index + i) % rbuf.buf_len) * rbuf.len))
......
...@@ -30,7 +30,6 @@ ...@@ -30,7 +30,6 @@
#include <sys/types.h> #include <sys/types.h>
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#include "private/trx_driver.h" #include "private/trx_driver.h"
#define DEBUG #define DEBUG
...@@ -39,21 +38,24 @@ ...@@ -39,21 +38,24 @@
#include "utils.c" #include "utils.c"
#include "ring_buffer.c" #include "ring_buffer.c"
// Statistics // Update period for packets per second statistic counter
// Update period for packets per second counter
#define PPS_UPDATE_PERIOD INT64_C(1000000000) #define PPS_UPDATE_PERIOD INT64_C(1000000000)
#define STAT_INT_LEN "9" #define STAT_INT_LEN "9"
#define MIN_PACKET_SIZE 64 #define MIN_PACKET_SIZE 64
#define MAX_CHANNELS 4 #define MAX_CHANNELS 4
#define ETHERNET_HEADER 14
#define ECPRI_COMMON_HEADER 4
#define ECPRI_IQ_HEADER (ECPRI_COMMON_HEADER + 4)
#define PACKET_HEADER (ETHERNET_HEADER + ECPRI_IQ_HEADER)
/* eCPRI frame structure /* eCPRI frame structure
Ethernet header: 18 bytes Ethernet header: 14 bytes
6 6 4 2 6 6 2
+---------+---------+-----+-----------+ +---------+---------+-----------+
| MAC dst | MAC src | Tag | Ethertype | | MAC dst | MAC src | Ethertype |
+---------+---------+-----+-----------+ +---------+---------+-----------+
eCPRI common header: 4 bytes eCPRI common header: 4 bytes
...@@ -63,7 +65,12 @@ eCPRI common header: 4 bytes ...@@ -63,7 +65,12 @@ eCPRI common header: 4 bytes
| | | | Type | Size | | | | | Type | Size |
+---------+----------+--------+---------+---------+ +---------+----------+--------+---------+---------+
Ethernet + eCPRI header: 26 bytes IQ data transfer message header: 4 bytes
2 2
+-------+--------+------------+
| PC_ID | SEQ_ID | IQ Samples |
+-------+--------+------------+
eCPRI concatenated frames eCPRI concatenated frames
...@@ -149,6 +156,8 @@ static volatile int seq_id; ...@@ -149,6 +156,8 @@ static volatile int seq_id;
static int send_sockfd; static int send_sockfd;
static int recv_sockfd; static int recv_sockfd;
static struct sockaddr_ll connect_sk_addr; static struct sockaddr_ll connect_sk_addr;
static uint8_t ecpri_iq_header[ECPRI_IQ_HEADER];
static uint8_t packet_header[PACKET_HEADER]; // ethernet + ecpri + iq header
static void print_stats(FILE * f, int print_header) { static void print_stats(FILE * f, int print_header) {
if(print_header) { if(print_header) {
...@@ -744,8 +753,7 @@ static int start_threads(TRXEcpriState * s) { ...@@ -744,8 +753,7 @@ static int start_threads(TRXEcpriState * s) {
int start(TRXEcpriState * s) { int start(TRXEcpriState * s) {
uint8_t dst_mac[6]; uint8_t dst_mac[6];
uint8_t src_mac[6]; uint8_t src_mac[6];
uint8_t ecpri_packet[PACKET_SIZE]; struct ether_header *eh = (struct ether_header *) packet_header;
struct ether_header *eh = (struct ether_header *) ecpri_packet;
int if_index; int if_index;
log_debug("TRX_ECPRI", "raw socket setup"); log_debug("TRX_ECPRI", "raw socket setup");
...@@ -776,31 +784,31 @@ int start(TRXEcpriState * s) { ...@@ -776,31 +784,31 @@ int start(TRXEcpriState * s) {
init_rbuf(trxr_rbuf[i], name, s->trx_buf_size, Complex); init_rbuf(trxr_rbuf[i], name, s->trx_buf_size, Complex);
} }
trx_wb_part_read_index = 0; memset((uint8_t *) packet_header, 0, PACKET_HEADER);
trx_wb_part_write_index = 0; memset((uint8_t *) ecpri_iq_header, 0, ECPRI_IQ_HEADER);
pthread_mutex_init(&tx_mutex, NULL);
pthread_mutex_init(&rx_mutex, NULL);
pthread_mutex_init(&tx_ready_mutex, NULL);
pthread_cond_init(&tx_cond, NULL); if(sscanf((char *) s->rrh_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
pthread_cond_init(&rx_cond, NULL); &d_addr.addr_bytes[0],
pthread_cond_init(&tx_ready_cond, NULL); &d_addr.addr_bytes[1],
&d_addr.addr_bytes[2],
sem_init(&trx_read_sem, 0, 0); &d_addr.addr_bytes[3],
&d_addr.addr_bytes[4],
memset((uint8_t *) ecpri_packet, 0, PACKET_SIZE); &d_addr.addr_bytes[5]) != 6)
fprintf(stderr, "Invalid RRH MAC address\n");
if(sscanf((char *) s->bbu_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c",
&s_addr.addr_bytes[0],
&s_addr.addr_bytes[1],
&s_addr.addr_bytes[2],
&s_addr.addr_bytes[3],
&s_addr.addr_bytes[4],
&s_addr.addr_bytes[5]) != 6)
fprintf(stderr, "Invalid BBU MAC address\n");
if (!(if_index = if_nametoindex(s->bbu_if))) { if (!(if_index = if_nametoindex(s->bbu_if))) {
perror("if_nametoindex"); perror("if_nametoindex");
return 1; return 1;
} }
if(sscanf(s->rrh_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c", &dst_mac[0], &dst_mac[1], &dst_mac[2], &dst_mac[3], &dst_mac[4], &dst_mac[5]) != 6)
fprintf(stderr, "Invalid RRH MAC address\n");
if(sscanf(s->bbu_mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c", &src_mac[0], &src_mac[1], &src_mac[2], &src_mac[3], &src_mac[4], &src_mac[5]) != 6)
fprintf(stderr, "Invalid RRH MAC address\n");
if ((send_sockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL))) == -1) { if ((send_sockfd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL))) == -1) {
perror("Socket Error"); perror("Socket Error");
return 1; return 1;
...@@ -815,8 +823,6 @@ int start(TRXEcpriState * s) { ...@@ -815,8 +823,6 @@ int start(TRXEcpriState * s) {
for(int i = 0; i < 6; i++) for(int i = 0; i < 6; i++)
connect_sk_addr.sll_addr[i] = dst_mac[i]; connect_sk_addr.sll_addr[i] = dst_mac[i];
log_debug("TRX_ECPRI", "bind");
for(int i = 0; i < 6; i++) for(int i = 0; i < 6; i++)
eh->ether_shost[i] = src_mac[i]; eh->ether_shost[i] = src_mac[i];
for(int i = 0; i < 6; i++) for(int i = 0; i < 6; i++)
...@@ -825,17 +831,13 @@ int start(TRXEcpriState * s) { ...@@ -825,17 +831,13 @@ int start(TRXEcpriState * s) {
/* Ethertype field */ /* Ethertype field */
eh->ether_type = htons(0xaefe); eh->ether_type = htons(0xaefe);
/* Standard Header */ /* Common Header */
ecpri_packet[14] = 0x10; // Protocol data revision 0x1, C = 0 ecpri_iq_header[0] = 0x10; // Version 0x1, Reserved = 0, C = 0
// Message type = 0x00, IQ data ecpri_iq_header[1] = 0x00; // Message type (IQ data)
// Payload size /* IQ message header */
*((uint16_t *) (ecpri_packet + 16)) = htons(244); *((uint16_t *) (ecpri_iq_header + ECPRI_COMMON_HEADER)) = htons(s->flow_id);
*((uint16_t *) (ecpri_packet + 18)) = htons(s->flow_id);
for(int i = 0; i < rxtx_buf_size; i++) { memcpy(packet_header + ETHERNET_HEADER, ecpri_iq_header, ECPRI_IQ_HEADER);
//log_debug("TRX_ECPRI", "%d / %d - %d\n", i, rxtx_buf_size, tx_rbuf.len);
memcpy(((uint8_t *) tx_rbuf.buffer) + (i * tx_rbuf.len), ecpri_packet, tx_rbuf.len);
}
start_threads(s); start_threads(s);
return 0; return 0;
...@@ -854,7 +856,8 @@ static int64_t prev_count = 0; ...@@ -854,7 +856,8 @@ static int64_t prev_count = 0;
static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md) static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void **__samples, int count, int tx_port_index, TRXWriteMetadata *md)
{ {
int write_count; int64_t ts; sample_group_t * g; int nc; int nk = 0; int write_count; int64_t ts; sample_group_t * g;
size_t nc, nk;
float ** _samples = (float **) __samples; float ** _samples = (float **) __samples;
TRXEcpriState *s = s1->opaque; TRXEcpriState *s = s1->opaque;
write_count = count / M; write_count = count / M;
...@@ -863,26 +866,19 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void ...@@ -863,26 +866,19 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count); log_debug("TRX_ECPRI_WRITE", "trx_ecpri_write, count = %ld", count);
if(prev_count && ((ts - prev_ts) != prev_count)) { if(prev_count && ((ts - prev_ts) != prev_count)) {
log_exit("TRX_ECPRI_WRITE", "Gap between timestamps: prev_ts %li ts %li prev_count %li count %li diff_ts %li", prev_ts, ts, prev_count, count, (ts - prev_ts)); log_exit("TRX_ECPRI_WRITE",
"Gap between timestamps: prev_ts %li ts %li prev_count %li count %li diff_ts %li",
prev_ts, ts, prev_count, count, (ts - prev_ts));
} }
prev_ts = ts; prev_count = write_count; prev_ts = ts; prev_count = write_count;
if(write_count > rbuf_write_amount(&trxw_rbuf[0])) { if(write_count > rbuf_write_amount(&trxw_rbuf[0])) {
//log_exit("TRX_ECPRI_WRITE", "Not enough space to write in trxw_rbuf (write count = %d)", write_count); //log_exit("TRX_ECPRI_WRITE",
// "Not enough space to write in trxw_rbuf (write count = %d)", write_count);
update_counter(&tx_drop_counter, write_count); update_counter(&tx_drop_counter, write_count);
return; return;
} }
if(s->trace_tx) {
if((write_counter.counter + write_count) >= (trxw_rbuf[0].buf_len + s->trace_offset)) {
tx_trace_ready = 1;
log_info("TRX_ECPRI_WRITE", "TX Trace ready");
pthread_exit(EXIT_SUCCESS);
} else if (tx_trace_ready) {
pthread_exit(EXIT_SUCCESS);
}
}
if(first_trx_write) { if(first_trx_write) {
sample_group_t * g2 = RBUF_WRITE0(trxw_group_rbuf, sample_group_t); sample_group_t * g2 = RBUF_WRITE0(trxw_group_rbuf, sample_group_t);
g2->count = ts; g2->count = ts;
...@@ -896,12 +892,14 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void ...@@ -896,12 +892,14 @@ static void trx_ecpri_write(TRXState *s1, trx_timestamp_t timestamp, const void
g->count = write_count; g->count = write_count;
while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[0], write_count))) { while((nc = rbuf_contiguous_copy(NULL, &trxw_rbuf[0], write_count))) {
int len = nc * trxr_rbuf[0].len * sizeof(Complex); size_t len = nc * trxr_rbuf[0].len * sizeof(Complex);
if(__samples) if(__samples)
for(int i = 0; i < s->tx_n_channel; i++) { for(int i = 0; i < s->tx_n_channel; i++) {
uint8_t * src = ((uint8_t*) _samples[i]) + (nk * trxr_rbuf[0].len * sizeof(Complex)); memcpy(
uint8_t * dst = ((uint8_t *) trxw_rbuf[i].buffer) + trxw_rbuf[0].write_index * trxw_rbuf[0].len * sizeof(Complex); ((uint8_t *) trxw_rbuf[i].buffer) +\
memcpy(dst, src, len); trxw_rbuf[0].write_index * trxw_rbuf[0].len * sizeof(Complex),
((uint8_t *) _samples[i]) + nk,
len);
} }
trxw_rbuf[0].write_index = (trxw_rbuf[0].write_index + nc) % trxw_rbuf[0].buf_len; trxw_rbuf[0].write_index = (trxw_rbuf[0].write_index + nc) % trxw_rbuf[0].buf_len;
write_count -= nc; write_count -= nc;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment