Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
T
trx-ecpri
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
trx-ecpri
Commits
65f8d8de
Commit
65f8d8de
authored
Dec 26, 2024
by
Joanne Hugé
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
wip
parent
f602b253
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
805 additions
and
5 deletions
+805
-5
trx_ecpri.c
trx_ecpri.c
+22
-5
trx_ecpri_raw_archive.c
trx_ecpri_raw_archive.c
+783
-0
No files found.
trx_ecpri.c
View file @
65f8d8de
...
@@ -48,7 +48,8 @@
...
@@ -48,7 +48,8 @@
#define ETHERNET_HEADER 14
#define ETHERNET_HEADER 14
#define ECPRI_COMMON_HEADER 4
#define ECPRI_COMMON_HEADER 4
#define ECPRI_IQ_HEADER (ECPRI_COMMON_HEADER + 4)
#define ECPRI_IQ_HEADER (ECPRI_COMMON_HEADER + 4)
#define PACKET_HEADER (ETHERNET_HEADER + ECPRI_IQ_HEADER)
#define ORAN_HEADER 8
#define PACKET_HEADER (ETHERNET_HEADER + ECPRI_IQ_HEADER + ORAN_HEADER)
/* eCPRI frame structure
/* eCPRI frame structure
...
@@ -157,7 +158,7 @@ static volatile int seq_id;
...
@@ -157,7 +158,7 @@ static volatile int seq_id;
static
int
send_sockfd
;
static
int
send_sockfd
;
static
int
recv_sockfd
;
static
int
recv_sockfd
;
static
struct
sockaddr_ll
connect_sk_addr
;
static
struct
sockaddr_ll
connect_sk_addr
;
static
uint8_t
ecpri_iq_header
[
ECPRI_IQ_HEADER
];
static
uint8_t
ecpri_iq_header
[
ECPRI_IQ_HEADER
+
ORAN_HEADER
];
static
uint8_t
packet_header
[
PACKET_HEADER
];
// ethernet + ecpri + iq header
static
uint8_t
packet_header
[
PACKET_HEADER
];
// ethernet + ecpri + iq header
static
void
print_stats
(
FILE
*
f
,
int
print_header
)
{
static
void
print_stats
(
FILE
*
f
,
int
print_header
)
{
...
@@ -419,6 +420,7 @@ static void *encode_thread(void *p) {
...
@@ -419,6 +420,7 @@ static void *encode_thread(void *p) {
int64_t
target_counter
=
0
;
int64_t
target_counter
=
0
;
struct
timespec
next
;
struct
timespec
next
;
int
reset_encode_counter
=
1
;
int
reset_encode_counter
=
1
;
uint8_t
*
data
;
// Set thread CPU affinity
// Set thread CPU affinity
CPU_ZERO
(
&
mask
);
CPU_ZERO
(
&
mask
);
...
@@ -436,7 +438,13 @@ static void *encode_thread(void *p) {
...
@@ -436,7 +438,13 @@ static void *encode_thread(void *p) {
// If there are frames from trx_write callback to encode
// If there are frames from trx_write callback to encode
if
(
to_write
&&
to_read
)
{
if
(
to_write
&&
to_read
)
{
data
=
rbuf_write
(
&
tx_rbuf
);
memcpy
(
data
,
packet_header
,
PACKET_HEADER
);
// SEQ_ID
*
((
uint16_t
*
)
data
[
ETHERNET_HEADER
+
ECPRI_COMMON_HEADER
+
2
])
=
htons
(
seq_id
++
);
// Frame ID
data
[
ETHERNET_HEADER
+
ECPRI_IQ_HEADER
+
1
]
=
seq_id
;
// TODO
// TODO
...
@@ -735,7 +743,7 @@ int start(TRXEcpriState * s) {
...
@@ -735,7 +743,7 @@ int start(TRXEcpriState * s) {
}
}
memset
((
uint8_t
*
)
packet_header
,
0
,
PACKET_HEADER
);
memset
((
uint8_t
*
)
packet_header
,
0
,
PACKET_HEADER
);
memset
((
uint8_t
*
)
ecpri_iq_header
,
0
,
ECPRI_IQ_HEADER
);
memset
((
uint8_t
*
)
ecpri_iq_header
,
0
,
ECPRI_IQ_HEADER
+
ORAN_HEADER
);
if
(
sscanf
((
char
*
)
s
->
rrh_mac
,
"%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c"
,
if
(
sscanf
((
char
*
)
s
->
rrh_mac
,
"%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c"
,
&
d_addr
.
addr_bytes
[
0
],
&
d_addr
.
addr_bytes
[
0
],
...
@@ -787,7 +795,16 @@ int start(TRXEcpriState * s) {
...
@@ -787,7 +795,16 @@ int start(TRXEcpriState * s) {
/* IQ message header */
/* IQ message header */
*
((
uint16_t
*
)
(
ecpri_iq_header
+
ECPRI_COMMON_HEADER
))
=
htons
(
s
->
flow_id
);
*
((
uint16_t
*
)
(
ecpri_iq_header
+
ECPRI_COMMON_HEADER
))
=
htons
(
s
->
flow_id
);
memcpy
(
packet_header
+
ETHERNET_HEADER
,
ecpri_iq_header
,
ECPRI_IQ_HEADER
);
/* ORAN HEADER */
ecpri_iq_header
[
ECPRI_IQ_HEADER
]
=
0x90
;
ecpri_iq_header
[
ECPRI_IQ_HEADER
+
3
]
=
0x00
;
ecpri_iq_header
[
ECPRI_IQ_HEADER
+
4
]
=
0x00
;
ecpri_iq_header
[
ECPRI_IQ_HEADER
+
5
]
=
0x00
;
ecpri_iq_header
[
ECPRI_IQ_HEADER
+
6
]
=
0x00
;
memcpy
(
packet_header
+
ETHERNET_HEADER
,
ecpri_iq_header
,
ECPRI_IQ_HEADER
+
ORAN_HEADER
);
start_threads
(
s
);
start_threads
(
s
);
return
0
;
return
0
;
...
...
trx_ecpri_raw_archive.c
0 → 100644
View file @
65f8d8de
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <error.h>
#include <fcntl.h>
#include <getopt.h>
#include <immintrin.h>
#include <inttypes.h>
#include <limits.h>
#include <linux/if_packet.h>
#include <math.h>
#include <netdb.h>
#include <netinet/ether.h>
#include <netinet/in.h>
#include <net/if.h>
#include <pthread.h>
#include <sched.h>
#include <semaphore.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include "trx_driver.h"
#define DEBUG
#define SSE4
/* define if CPU supports SSE4.1 */
#include "private.c"
/* eCPRI Send and Recv */
#define PACKET_SIZE 262
#define FRAME_FREQ INT64_C(3840000)
#define SEND_LIMIT 1250
#define TRX_WB_MAX_PARTS 1000
#define TRX_BUF_MAX_SIZE 1000
static
void
log_error
(
const
char
*
section
,
const
char
*
msg
,
...)
{
time_t
t
;
struct
tm
ts
;
char
line
[
256
];
va_list
arglist
;
time
(
&
t
);
ts
=
*
localtime
(
&
t
);
strftime
(
line
,
80
,
"%m-%d %H:%M:%S"
,
&
ts
);
sprintf
(
line
+
strlen
(
line
),
" ERROR [%s] "
,
section
);
va_start
(
arglist
,
msg
);
vsprintf
(
line
+
strlen
(
line
),
msg
,
arglist
);
va_end
(
arglist
);
puts
(
line
);
exit
(
EXIT_FAILURE
);
}
static
void
log_info
(
const
char
*
section
,
const
char
*
msg
,
...)
{
time_t
t
;
struct
tm
ts
;
char
line
[
256
];
va_list
arglist
;
time
(
&
t
);
ts
=
*
localtime
(
&
t
);
strftime
(
line
,
80
,
"%m-%d %H:%M:%S"
,
&
ts
);
sprintf
(
line
+
strlen
(
line
),
" INFO [%s] "
,
section
);
va_start
(
arglist
,
msg
);
vsprintf
(
line
+
strlen
(
line
),
msg
,
arglist
);
va_end
(
arglist
);
puts
(
line
);
}
#ifdef DEBUG
static
void
log_debug
(
const
char
*
section
,
const
char
*
msg
,
...)
{
time_t
t
;
struct
tm
ts
;
char
line
[
256
];
va_list
arglist
;
time
(
&
t
);
ts
=
*
localtime
(
&
t
);
strftime
(
line
,
80
,
"%m-%d %H:%M:%S"
,
&
ts
);
sprintf
(
line
+
strlen
(
line
),
" DEBUG [%s] "
,
section
);
va_start
(
arglist
,
msg
);
vsprintf
(
line
+
strlen
(
line
),
msg
,
arglist
);
va_end
(
arglist
);
puts
(
line
);
}
#else
#define log_debug(...)
#endif
static
int
latency_target_fd
=
-
1
;
static
int32_t
latency_target_value
=
0
;
/* Latency trick
* if the file /dev/cpu_dma_latency exists,
* open it and write a zero into it. This will tell
* the power management system not to transition to
* a high cstate (in fact, the system acts like idle=poll)
* When the fd to /dev/cpu_dma_latency is closed, the behavior
* goes back to the system default.
*
* Documentation/power/pm_qos_interface.txt
*/
void
set_latency_target
(
void
)
{
struct
stat
s
;
int
err
;
errno
=
0
;
err
=
stat
(
"/dev/cpu_dma_latency"
,
&
s
);
if
(
err
==
-
1
)
{
error
(
EXIT_FAILURE
,
errno
,
"WARN: stat /dev/cpu_dma_latency failed"
);
return
;
}
errno
=
0
;
latency_target_fd
=
open
(
"/dev/cpu_dma_latency"
,
O_RDWR
);
if
(
latency_target_fd
==
-
1
)
{
error
(
EXIT_FAILURE
,
errno
,
"WARN: open /dev/cpu_dma_latency"
);
return
;
}
errno
=
0
;
err
=
write
(
latency_target_fd
,
&
latency_target_value
,
4
);
if
(
err
<
1
)
{
error
(
EXIT_FAILURE
,
errno
,
"# error setting cpu_dma_latency to %d!"
,
latency_target_value
);
close
(
latency_target_fd
);
return
;
}
printf
(
"# /dev/cpu_dma_latency set to %dus
\n
"
,
latency_target_value
);
}
typedef
struct
{
volatile
void
*
buffer
;
char
name
[
64
];
size_t
buf_len
;
size_t
len
;
volatile
int
write_index
;
volatile
int
read_index
;
}
ring_buffer_t
;
typedef
struct
{
const
char
*
re_mac
;
const
char
*
rec_mac
;
const
char
*
rec_if
;
int
recv_affinity
;
int
send_affinity
;
int
prepare_affinity
;
int
decompress_affinity
;
int
ecpri_period
;
int
flow_id
;
int
sample_rate
;
}
TRXEcpriState
;
// Buffers
static
ring_buffer_t
rx_rbuf
;
static
ring_buffer_t
trx_read_rbuf
;
static
ring_buffer_t
tx_rbuf
;
static
ring_buffer_t
trx_write_rbuf
;
static
volatile
int
trx_wb_part
[
TRX_WB_MAX_PARTS
];
// TODO write next index instead of current
static
volatile
int64_t
trx_wb_ts
[
TRX_WB_MAX_PARTS
];
static
int
trx_wb_part_read_index
;
static
int
trx_wb_part_write_index
;
// Locks
pthread_mutex_t
tx_mutex
;
pthread_cond_t
tx_cond
;
pthread_mutex_t
rx_mutex
;
pthread_cond_t
rx_cond
;
pthread_mutex_t
tx_ready_mutex
;
pthread_cond_t
tx_ready_cond
;
sem_t
trx_read_sem
;
// Counters
static
volatile
int64_t
prepared_frame_count
;
static
volatile
int64_t
read_frame_count
;
static
volatile
int64_t
sent_frame_count
;
// Computed values
static
int
rxtx_buf_size
;
static
int
ecpri_period_mult
;
// Network
static
volatile
int
seq_id
;
static
int
send_sockfd
;
static
int
recv_sockfd
;
static
struct
sockaddr_ll
connect_sk_addr
;
// Timestamps utils
#define NSEC_PER_SEC INT64_C(1000000000)
static
struct
timespec
int_to_ts
(
int64_t
t
)
{
struct
timespec
ts
;
ts
.
tv_sec
=
t
/
NSEC_PER_SEC
;
ts
.
tv_nsec
=
t
-
(
ts
.
tv_sec
*
NSEC_PER_SEC
);
return
ts
;
}
static
int64_t
ts_to_int
(
struct
timespec
ts
)
{
return
ts
.
tv_sec
*
NSEC_PER_SEC
+
ts
.
tv_nsec
;
}
static
void
add_ns
(
struct
timespec
*
t
,
int64_t
ns
)
{
t
->
tv_nsec
+=
ns
;
while
(
t
->
tv_nsec
>=
((
int64_t
)
NSEC_PER_SEC
))
{
t
->
tv_sec
+=
1
;
t
->
tv_nsec
-=
NSEC_PER_SEC
;
}
}
static
int64_t
calcdiff_ns
(
struct
timespec
t1
,
struct
timespec
t2
)
{
int64_t
diff
;
diff
=
NSEC_PER_SEC
*
((
int
)
t1
.
tv_sec
-
(
int
)
t2
.
tv_sec
);
diff
+=
((
int
)
t1
.
tv_nsec
-
(
int
)
t2
.
tv_nsec
);
return
diff
;
}
static
void
rbuf_update_write_index
(
ring_buffer_t
*
rbuf
)
{
rbuf
->
write_index
=
(
rbuf
->
write_index
+
1
)
%
rbuf
->
buf_len
;
}
static
void
rbuf_update_read_index
(
ring_buffer_t
*
rbuf
)
{
rbuf
->
read_index
=
(
rbuf
->
read_index
+
1
)
%
rbuf
->
buf_len
;
}
static
int
rbuf_read_amount
(
const
ring_buffer_t
*
rbuf
)
{
return
(
rbuf
->
read_index
+
rbuf
->
buf_len
-
rbuf
->
write_index
)
%
rbuf
->
buf_len
;
}
static
int
rbuf_write_amount
(
const
ring_buffer_t
*
rbuf
)
{
return
(
rbuf
->
write_index
+
rbuf
->
buf_len
-
rbuf
->
read_index
)
%
rbuf
->
buf_len
;
}
#define RBUF_READ(rbuf, type) (((type *) rbuf.buffer) + (rbuf.read_index * rbuf.len))
#define RBUF_WRITE(rbuf, type) (((type *) rbuf.buffer) + (rbuf.write_index * rbuf.len))
#define RBUF_INIT(rbuf, _name, _buf_len, _len, type) do\
{\
log_debug("TRX_ECPRI", "Allocating %s with %d bytes\n", _name, (_buf_len * _len));\
rbuf.buffer = (type *) malloc(_buf_len * _len);\
strcpy(rbuf.name, _name);\
rbuf.buf_len = _buf_len;\
rbuf.len = _len;\
rbuf.write_index = 0;\
rbuf.read_index = 0;\
} while(0)
static
void
*
recv_thread
(
void
*
p
)
{
cpu_set_t
mask
;
TRXEcpriState
*
s
=
(
TRXEcpriState
*
)
p
;
int
ret
;
log_info
(
"RECV_THREAD"
,
"Thread init"
);
// Set thread CPU affinity
CPU_ZERO
(
&
mask
);
CPU_SET
(
s
->
recv_affinity
,
&
mask
);
if
(
sched_setaffinity
(
0
,
sizeof
(
mask
),
&
mask
))
error
(
EXIT_FAILURE
,
errno
,
"Could not set CPU affinity to CPU %d
\n
"
,
s
->
recv_affinity
);
for
(;;)
{
struct
mmsghdr
msgh
[
4000
];
struct
iovec
msgv
[
4000
];
memset
(
msgv
,
0
,
sizeof
(
msgv
));
memset
(
msgh
,
0
,
sizeof
(
msgh
));
for
(
int
j
=
0
;
j
<
ecpri_period_mult
;
j
++
)
{
msgv
[
j
].
iov_base
=
RBUF_WRITE
(
rx_rbuf
,
uint8_t
);
msgv
[
j
].
iov_len
=
rx_rbuf
.
len
;
msgh
[
j
].
msg_hdr
.
msg_iov
=
&
msgv
[
j
];
msgh
[
j
].
msg_hdr
.
msg_iovlen
=
1
;
rbuf_update_write_index
(
&
rx_rbuf
);
}
ret
=
recvmmsg
(
recv_sockfd
,
msgh
,
ecpri_period_mult
,
0
,
NULL
);
if
(
ret
==
-
1
)
error
(
EXIT_FAILURE
,
errno
,
"recvmmsg error"
);
if
(
ret
!=
ecpri_period_mult
)
log_error
(
"RECV_THREAD"
,
"recvmmsg received %d messages instead of %d
\n
"
,
ret
,
ecpri_period_mult
);
pthread_mutex_lock
(
&
rx_mutex
);
pthread_cond_signal
(
&
rx_cond
);
pthread_mutex_unlock
(
&
rx_mutex
);
}
pthread_exit
(
EXIT_SUCCESS
);
}
static
void
*
send_thread
(
void
*
p
)
{
cpu_set_t
mask
;
struct
timespec
initial
,
next
;
struct
timespec
t1
[
4000
];
struct
timespec
t2
[
4000
];
int
k
=
0
;
TRXEcpriState
*
s
=
(
TRXEcpriState
*
)
p
;
struct
mmsghdr
msgh
[
4000
];
struct
iovec
msgv
[
4000
];
log_info
(
"SEND_THREAD"
,
"Thread init"
);
// Set thread CPU affinity
CPU_ZERO
(
&
mask
);
CPU_SET
(
s
->
send_affinity
,
&
mask
);
if
(
sched_setaffinity
(
0
,
sizeof
(
mask
),
&
mask
))
error
(
EXIT_FAILURE
,
errno
,
"Could not set CPU affinity to CPU %d
\n
"
,
s
->
send_affinity
);
memset
(
msgv
,
0
,
sizeof
(
msgv
));
memset
(
msgh
,
0
,
sizeof
(
msgh
));
for
(
int
j
=
0
;
j
<
ecpri_period_mult
;
j
++
)
{
msgh
[
j
].
msg_hdr
.
msg_name
=
&
connect_sk_addr
;
msgh
[
j
].
msg_hdr
.
msg_namelen
=
sizeof
(
connect_sk_addr
);
msgh
[
j
].
msg_hdr
.
msg_iov
=
&
msgv
[
j
];
msgh
[
j
].
msg_hdr
.
msg_iovlen
=
1
;
}
pthread_mutex_lock
(
&
tx_ready_mutex
);
pthread_cond_wait
(
&
tx_ready_cond
,
&
tx_ready_mutex
);
pthread_mutex_unlock
(
&
tx_ready_mutex
);
clock_gettime
(
CLOCK_TAI
,
&
initial
);
for
(
int64_t
i
=
1
;;
i
++
)
{
int
ret
,
msg_sent
;
#ifdef DEBUG
if
(
i
>
SEND_LIMIT
)
{
int64_t
d
,
dt
;
clock_gettime
(
CLOCK_TAI
,
&
next
);
d
=
calcdiff_ns
(
next
,
initial
);
for
(
int
j
=
0
;
j
<
k
;
j
++
)
{
dt
=
calcdiff_ns
(
t2
[
j
],
t1
[
j
]);
log_debug
(
"SEND_THREAD"
,
"%"
PRIi64
,
dt
);
}
log_debug
(
"SEND_THREAD"
,
"Packets sent: %"
PRIi64
,
sent_frame_count
);
log_debug
(
"SEND_THREAD"
,
"Duration: %"
PRIi64
,
d
);
log_debug
(
"SEND_THREAD"
,
"ecpri_period_mult: %"
PRIi64
,
ecpri_period_mult
);
log_debug
(
"SEND_THREAD"
,
"FRAME_FREQ: %"
PRIi64
,
FRAME_FREQ
);
exit
(
EXIT_SUCCESS
);
}
#endif
next
=
initial
;
// Multiply by i everytime to prevent any frequence drift
add_ns
(
&
next
,
(
ecpri_period_mult
*
NSEC_PER_SEC
*
i
)
/
FRAME_FREQ
);
for
(
int
j
=
0
;
j
<
ecpri_period_mult
;
j
++
)
{
msgv
[
j
].
iov_base
=
RBUF_READ
(
tx_rbuf
,
uint8_t
);
msgv
[
j
].
iov_len
=
tx_rbuf
.
len
;
rbuf_update_read_index
(
&
tx_rbuf
);
}
for
(
msg_sent
=
0
;
msg_sent
<
ecpri_period_mult
;)
{
#ifdef DEBUG
clock_gettime
(
CLOCK_TAI
,
&
t1
[
k
]);
#endif
ret
=
sendmmsg
(
send_sockfd
,
msgh
+
msg_sent
,
(
ecpri_period_mult
-
msg_sent
),
0
);
#ifdef DEBUG
clock_gettime
(
CLOCK_TAI
,
&
t2
[
k
++
]);
#endif
if
(
ret
<=
0
)
error
(
EXIT_FAILURE
,
errno
,
"sendmmsg error (returned %d)"
,
ret
);
msg_sent
+=
ret
;
sent_frame_count
+=
ret
;
}
pthread_mutex_lock
(
&
tx_mutex
);
pthread_cond_signal
(
&
tx_cond
);
pthread_mutex_unlock
(
&
tx_mutex
);
clock_nanosleep
(
CLOCK_TAI
,
TIMER_ABSTIME
,
&
next
,
NULL
);
}
pthread_exit
(
EXIT_SUCCESS
);
}
static
void
*
prepare_thread
(
void
*
p
)
{
cpu_set_t
mask
;
TRXEcpriState
*
s
=
(
TRXEcpriState
*
)
p
;
int
tx_ready_buffer_full
=
0
;
log_info
(
"PREPARE_THREAD"
,
"Thread init"
);
// Set thread CPU affinity
CPU_ZERO
(
&
mask
);
CPU_SET
(
s
->
prepare_affinity
,
&
mask
);
if
(
sched_setaffinity
(
0
,
sizeof
(
mask
),
&
mask
))
error
(
EXIT_FAILURE
,
errno
,
"Could not set CPU affinity to CPU %d
\n
"
,
s
->
prepare_affinity
);
for
(
int64_t
i
=
0
;;
i
++
)
{
int16_t
samples_int
[
256
];
// If we have frames to prepare
int
n
=
rbuf_write_amount
(
&
tx_rbuf
);
if
((
i
==
0
)
||
n
)
{
// If there are frames from trx_write callback to prepare
if
(
rbuf_read_amount
(
&
trx_write_rbuf
))
{
int64_t
ts
=
trx_wb_ts
[
trx_wb_part_read_index
];
int
empty_frames_ahead
=
ts
-
prepared_frame_count
;
empty_frames_ahead
=
empty_frames_ahead
<
n
?
empty_frames_ahead
:
n
;
if
(
empty_frames_ahead
>
0
)
{
for
(
int
j
=
0
;
j
<
empty_frames_ahead
;
j
++
)
{
*
((
uint16_t
*
)
(
RBUF_WRITE
(
tx_rbuf
,
uint8_t
)
+
20
))
=
htons
(
seq_id
++
);
rbuf_update_write_index
(
&
tx_rbuf
);
prepared_frame_count
++
;
}
}
else
if
(
empty_frames_ahead
==
0
)
{
int
m
=
trx_wb_part
[(
trx_wb_part_read_index
+
1
)
%
TRX_WB_MAX_PARTS
]
-
trx_write_rbuf
.
read_index
;
m
=
m
<
n
?
m
:
n
;
for
(
int
j
=
0
;
j
<
m
;
j
++
)
{
float
*
const
trx_samples
=
RBUF_READ
(
trx_write_rbuf
,
float
);
uint8_t
*
const
tx_frame
=
RBUF_WRITE
(
tx_rbuf
,
uint8_t
);
memset
(
samples_int
,
0
,
512
);
float_to_int16
(
samples_int
,
trx_samples
,
256
,
32767
);
encode_bf1
(
tx_frame
+
22
,
samples_int
);
encode_bf1
(
tx_frame
+
22
+
60
,
samples_int
+
64
);
encode_bf1
(
tx_frame
+
22
+
120
,
samples_int
+
128
);
encode_bf1
(
tx_frame
+
22
+
180
,
samples_int
+
192
);
*
((
uint16_t
*
)(
tx_frame
+
20
))
=
htons
(
seq_id
++
);
rbuf_update_write_index
(
&
tx_rbuf
);
rbuf_update_read_index
(
&
trx_write_rbuf
);
prepared_frame_count
++
;
}
if
(
m
==
0
)
trx_wb_part_read_index
=
(
trx_wb_part_read_index
+
1
)
%
TRX_WB_MAX_PARTS
;
}
else
{
log_error
(
"PREPARE_THREAD"
,
"missed trx_write timestamp"
);
}
}
else
{
*
((
uint16_t
*
)
(
RBUF_WRITE
(
tx_rbuf
,
uint8_t
)
+
20
))
=
htons
(
seq_id
++
);
rbuf_update_write_index
(
&
tx_rbuf
);
prepared_frame_count
++
;
}
}
else
{
if
(
!
tx_ready_buffer_full
)
{
tx_ready_buffer_full
=
1
;
pthread_mutex_lock
(
&
tx_ready_mutex
);
pthread_cond_signal
(
&
tx_ready_cond
);
pthread_mutex_unlock
(
&
tx_ready_mutex
);
}
pthread_mutex_lock
(
&
tx_mutex
);
pthread_cond_wait
(
&
tx_cond
,
&
tx_mutex
);
pthread_mutex_unlock
(
&
tx_mutex
);
}
}
pthread_exit
(
EXIT_SUCCESS
);
}
static
void
*
decompress_thread
(
void
*
p
)
{
cpu_set_t
mask
;
TRXEcpriState
*
s
=
(
TRXEcpriState
*
)
p
;
int
rx_ready
=
0
;
const
float
mult
=
1
.
/
32767
.;
log_info
(
"DECOMPRESS_THREAD"
,
"Thread init"
);
// Set thread CPU affinity
CPU_ZERO
(
&
mask
);
CPU_SET
(
s
->
decompress_affinity
,
&
mask
);
if
(
sched_setaffinity
(
0
,
sizeof
(
mask
),
&
mask
))
error
(
EXIT_FAILURE
,
errno
,
"Could not set CPU affinity to CPU %d
\n
"
,
s
->
decompress_affinity
);
for
(;;)
{
int
n
=
rbuf_read_amount
(
&
rx_rbuf
);
if
(
n
)
{
for
(
int
j
=
0
;
j
<
n
;
j
++
)
{
int16_t
samples_int
[
256
];
const
uint8_t
*
rx_samples
=
RBUF_READ
(
rx_rbuf
,
uint8_t
)
+
22
;
// TODO : analyze seq_id, ecpri packet type etc... ?
// TODO : set rx_ready at some point (when ?)
if
(
rx_ready
)
{
memset
(
samples_int
,
0
,
512
);
decode_bf1
(
samples_int
,
rx_samples
,
16
);
decode_bf1
(
samples_int
+
64
,
rx_samples
+
60
,
16
);
decode_bf1
(
samples_int
+
128
,
rx_samples
+
120
,
16
);
decode_bf1
(
samples_int
+
192
,
rx_samples
+
180
,
16
);
int16_to_float
(
RBUF_WRITE
(
trx_read_rbuf
,
float
),
samples_int
,
256
,
mult
);
rbuf_update_read_index
(
&
rx_rbuf
);
rbuf_update_write_index
(
&
trx_read_rbuf
);
sem_post
(
&
trx_read_sem
);
}
}
}
else
{
pthread_mutex_lock
(
&
rx_mutex
);
pthread_cond_wait
(
&
rx_cond
,
&
rx_mutex
);
pthread_mutex_unlock
(
&
rx_mutex
);
}
}
pthread_exit
(
EXIT_SUCCESS
);
}
static
int
start_threads
(
TRXEcpriState
*
s
)
{
pthread_t
recv_pthread
;
pthread_t
send_pthread
;
pthread_t
prepare_pthread
;
pthread_t
decompress_pthread
;
struct
sched_param
recv_param
;
struct
sched_param
send_param
;
struct
sched_param
prepare_param
;
struct
sched_param
decompress_param
;
pthread_attr_t
recv_attr
;
pthread_attr_t
send_attr
;
pthread_attr_t
prepare_attr
;
pthread_attr_t
decompress_attr
;
log_info
(
"TRX_ECPRI"
,
"Starting threads"
);
// Initialize pthread attributes (default values)
if
(
pthread_attr_init
(
&
recv_attr
))
log_error
(
"TRX_ECPRI"
,
"init pthread attributes failed
\n
"
);
// Set a specific stack size
if
(
pthread_attr_setstacksize
(
&
recv_attr
,
PTHREAD_STACK_MIN
))
log_error
(
"TRX_ECPRI"
,
"pthread setstacksize failed
\n
"
);
// Set scheduler policy and priority of pthread
if
(
pthread_attr_setschedpolicy
(
&
recv_attr
,
SCHED_FIFO
))
log_error
(
"TRX_ECPRI"
,
"pthread setschedpolicy failed
\n
"
);
recv_param
.
sched_priority
=
97
;
if
(
pthread_attr_setschedparam
(
&
recv_attr
,
&
recv_param
))
log_error
(
"TRX_ECPRI"
,
"pthread setschedparam failed
\n
"
);
/* Use scheduling parameters of attr */
if
(
pthread_attr_setinheritsched
(
&
recv_attr
,
PTHREAD_EXPLICIT_SCHED
))
log_error
(
"TRX_ECPRI"
,
"pthread setinheritsched failed
\n
"
);
if
(
pthread_attr_init
(
&
send_attr
))
log_error
(
"TRX_ECPRI"
,
"init pthread attributes failed
\n
"
);
if
(
pthread_attr_setstacksize
(
&
send_attr
,
PTHREAD_STACK_MIN
))
log_error
(
"TRX_ECPRI"
,
"pthread setstacksize failed
\n
"
);
if
(
pthread_attr_setschedpolicy
(
&
send_attr
,
SCHED_FIFO
))
log_error
(
"TRX_ECPRI"
,
"pthread setschedpolicy failed
\n
"
);
send_param
.
sched_priority
=
97
;
if
(
pthread_attr_setschedparam
(
&
send_attr
,
&
send_param
))
log_error
(
"TRX_ECPRI"
,
"pthread setschedparam failed
\n
"
);
if
(
pthread_attr_setinheritsched
(
&
send_attr
,
PTHREAD_EXPLICIT_SCHED
))
log_error
(
"TRX_ECPRI"
,
"pthread setinheritsched failed
\n
"
);
if
(
pthread_attr_init
(
&
prepare_attr
))
log_error
(
"TRX_ECPRI"
,
"init pthread attributes failed
\n
"
);
if
(
pthread_attr_setstacksize
(
&
prepare_attr
,
PTHREAD_STACK_MIN
))
log_error
(
"TRX_ECPRI"
,
"pthread setstacksize failed
\n
"
);
if
(
pthread_attr_setschedpolicy
(
&
prepare_attr
,
SCHED_FIFO
))
log_error
(
"TRX_ECPRI"
,
"pthread setschedpolicy failed
\n
"
);
prepare_param
.
sched_priority
=
97
;
if
(
pthread_attr_setschedparam
(
&
prepare_attr
,
&
prepare_param
))
log_error
(
"TRX_ECPRI"
,
"pthread setschedparam failed
\n
"
);
if
(
pthread_attr_setinheritsched
(
&
prepare_attr
,
PTHREAD_EXPLICIT_SCHED
))
log_error
(
"TRX_ECPRI"
,
"pthread setinheritsched failed
\n
"
);
if
(
pthread_attr_init
(
&
decompress_attr
))
log_error
(
"TRX_ECPRI"
,
"init pthread attributes failed
\n
"
);
if
(
pthread_attr_setstacksize
(
&
decompress_attr
,
PTHREAD_STACK_MIN
))
log_error
(
"TRX_ECPRI"
,
"pthread setstacksize failed
\n
"
);
if
(
pthread_attr_setschedpolicy
(
&
decompress_attr
,
SCHED_FIFO
))
log_error
(
"TRX_ECPRI"
,
"pthread setschedpolicy failed
\n
"
);
decompress_param
.
sched_priority
=
97
;
if
(
pthread_attr_setschedparam
(
&
decompress_attr
,
&
decompress_param
))
log_error
(
"TRX_ECPRI"
,
"pthread setschedparam failed
\n
"
);
if
(
pthread_attr_setinheritsched
(
&
decompress_attr
,
PTHREAD_EXPLICIT_SCHED
))
log_error
(
"TRX_ECPRI"
,
"pthread setinheritsched failed
\n
"
);
if
(
pthread_create
(
&
recv_pthread
,
NULL
,
recv_thread
,
s
))
error
(
EXIT_FAILURE
,
errno
,
"Couldn't create recv thread"
);
if
(
pthread_create
(
&
send_pthread
,
NULL
,
send_thread
,
s
))
error
(
EXIT_FAILURE
,
errno
,
"Couldn't create send thread"
);
if
(
pthread_create
(
&
prepare_pthread
,
NULL
,
prepare_thread
,
s
))
error
(
EXIT_FAILURE
,
errno
,
"Couldn't create prepare thread"
);
if
(
pthread_create
(
&
decompress_pthread
,
NULL
,
decompress_thread
,
s
))
error
(
EXIT_FAILURE
,
errno
,
"Couldn't create decompress thread"
);
return
0
;
}
int
start
(
TRXEcpriState
*
s
)
{
uint8_t
dst_mac
[
6
];
uint8_t
src_mac
[
6
];
uint8_t
ecpri_packet
[
PACKET_SIZE
];
struct
ether_header
*
eh
=
(
struct
ether_header
*
)
ecpri_packet
;
int
if_index
;
log_debug
(
"TRX_ECPRI"
,
"raw socket setup"
);
//set_latency_target();
seq_id
=
0
;
read_frame_count
=
0
;
sent_frame_count
=
0
;
prepared_frame_count
=
0
;
ecpri_period_mult
=
(
s
->
ecpri_period
*
FRAME_FREQ
)
/
1000000
;
rxtx_buf_size
=
(
3
*
ecpri_period_mult
);
RBUF_INIT
(
rx_rbuf
,
"RX ring buffer"
,
rxtx_buf_size
,
PACKET_SIZE
,
uint8_t
);
RBUF_INIT
(
tx_rbuf
,
"TX ring buffer"
,
rxtx_buf_size
,
PACKET_SIZE
,
uint8_t
);
RBUF_INIT
(
trx_read_rbuf
,
"TRXRead ring buffer"
,
TRX_BUF_MAX_SIZE
,
256
,
float
);
RBUF_INIT
(
trx_write_rbuf
,
"TRXWrite ring buffer"
,
TRX_BUF_MAX_SIZE
,
256
,
float
);
trx_wb_part_read_index
=
0
;
trx_wb_part_write_index
=
0
;
pthread_mutex_init
(
&
tx_mutex
,
NULL
);
pthread_mutex_init
(
&
rx_mutex
,
NULL
);
pthread_mutex_init
(
&
tx_ready_mutex
,
NULL
);
pthread_cond_init
(
&
tx_cond
,
NULL
);
pthread_cond_init
(
&
rx_cond
,
NULL
);
pthread_cond_init
(
&
tx_ready_cond
,
NULL
);
sem_init
(
&
trx_read_sem
,
0
,
0
);
memset
((
uint8_t
*
)
ecpri_packet
,
0
,
PACKET_SIZE
);
if
(
!
(
if_index
=
if_nametoindex
(
s
->
rec_if
)))
{
perror
(
"if_nametoindex"
);
return
1
;
}
if
(
sscanf
(
s
->
re_mac
,
"%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c"
,
&
dst_mac
[
0
],
&
dst_mac
[
1
],
&
dst_mac
[
2
],
&
dst_mac
[
3
],
&
dst_mac
[
4
],
&
dst_mac
[
5
])
!=
6
)
fprintf
(
stderr
,
"Invalid eRE MAC address
\n
"
);
if
(
sscanf
(
s
->
rec_mac
,
"%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%*c"
,
&
src_mac
[
0
],
&
src_mac
[
1
],
&
src_mac
[
2
],
&
src_mac
[
3
],
&
src_mac
[
4
],
&
src_mac
[
5
])
!=
6
)
fprintf
(
stderr
,
"Invalid eREC MAC address
\n
"
);
if
((
send_sockfd
=
socket
(
AF_PACKET
,
SOCK_RAW
,
htons
(
ETH_P_ALL
)))
==
-
1
)
{
perror
(
"Socket Error"
);
return
1
;
}
if
((
recv_sockfd
=
socket
(
AF_PACKET
,
SOCK_RAW
,
htons
(
ETH_P_ALL
)))
==
-
1
)
{
perror
(
"Socket Error"
);
return
1
;
}
connect_sk_addr
.
sll_ifindex
=
if_index
;
connect_sk_addr
.
sll_halen
=
ETH_ALEN
;
for
(
int
i
=
0
;
i
<
6
;
i
++
)
connect_sk_addr
.
sll_addr
[
i
]
=
dst_mac
[
i
];
log_debug
(
"TRX_ECPRI"
,
"bind"
);
for
(
int
i
=
0
;
i
<
6
;
i
++
)
eh
->
ether_shost
[
i
]
=
src_mac
[
i
];
for
(
int
i
=
0
;
i
<
6
;
i
++
)
eh
->
ether_dhost
[
i
]
=
dst_mac
[
i
];
/* Ethertype field */
eh
->
ether_type
=
htons
(
0xaefe
);
/* Standard Header */
ecpri_packet
[
14
]
=
0x10
;
// Protocol data revision 0x1, C = 0
// Message type = 0x00, IQ data
// Payload size
*
((
uint16_t
*
)
(
ecpri_packet
+
16
))
=
htons
(
244
);
*
((
uint16_t
*
)
(
ecpri_packet
+
18
))
=
htons
(
s
->
flow_id
);
for
(
int
i
=
0
;
i
<
rxtx_buf_size
;
i
++
)
{
//log_debug("TRX_ECPRI", "%d / %d - %d\n", i, rxtx_buf_size, tx_rbuf.len);
memcpy
(((
uint8_t
*
)
tx_rbuf
.
buffer
)
+
(
i
*
tx_rbuf
.
len
),
ecpri_packet
,
tx_rbuf
.
len
);
}
start_threads
(
s
);
return
0
;
}
static
void
trx_ecpri_end
(
TRXState
*
s1
)
{
log_info
(
"TRX_ECPRI"
,
"End"
);
TRXEcpriState
*
s
=
s1
->
opaque
;
free
(
s
);
}
static
void
trx_ecpri_write
(
TRXState
*
s1
,
trx_timestamp_t
timestamp
,
const
void
**
__samples
,
int
count
,
int
tx_port_index
,
TRXWriteMetadata
*
md
)
{
(
void
)
s1
;
float
**
_samples
=
(
float
**
)
__samples
;
int
write_count
=
count
>>
5
;
int64_t
ts
=
timestamp
>>
5
;
trx_wb_part
[
trx_wb_part_write_index
]
=
trx_write_rbuf
.
write_index
;
trx_wb_ts
[
trx_wb_part_write_index
]
=
ts
;
for
(
int
k
=
0
;
k
<
write_count
;
k
++
)
{
for
(
int
i
=
0
;
i
<
4
;
i
++
)
for
(
int
j
=
0
;
j
<
64
;
j
++
)
RBUF_WRITE
(
trx_write_rbuf
,
float
)[
i
*
64
+
j
]
=
_samples
[
i
][
j
+
(
k
<<
6
)];
rbuf_update_write_index
(
&
trx_write_rbuf
);
}
trx_wb_part_write_index
=
(
trx_wb_part_write_index
+
1
)
%
TRX_WB_MAX_PARTS
;
trx_wb_part
[
trx_wb_part_write_index
]
=
trx_write_rbuf
.
write_index
+
write_count
;
}
static
int
trx_ecpri_read
(
TRXState
*
s1
,
trx_timestamp_t
*
ptimestamp
,
void
**
__samples
,
int
count
,
int
rx_port_index
,
TRXReadMetadata
*
md
)
{
(
void
)
s1
;
float
**
_samples
=
(
float
**
)
__samples
;
int
read_count
=
count
>>
5
;
for
(
int
k
=
0
;
k
<
read_count
;
k
++
)
{
float
*
trx_samples
;
sem_wait
(
&
trx_read_sem
);
trx_samples
=
RBUF_READ
(
trx_read_rbuf
,
float
);
for
(
int
i
=
0
;
i
<
4
;
i
++
)
for
(
int
j
=
0
;
j
<
64
;
j
++
)
_samples
[
i
][
j
+
(
k
<<
6
)]
=
trx_samples
[
i
*
64
+
j
];
rbuf_update_read_index
(
&
trx_read_rbuf
);
}
*
ptimestamp
=
read_frame_count
<<
5
;
read_frame_count
+=
read_count
;
return
count
;
}
/* This function can be used to automatically set the sample
rate. Here we don't implement it, so the user has to force a given
sample rate with the "sample_rate" configuration option */
static
int
trx_ecpri_get_sample_rate
(
TRXState
*
s1
,
TRXFraction
*
psample_rate
,
int
*
psample_rate_num
,
int
sample_rate_min
)
{
return
-
1
;
}
static
int
trx_ecpri_start
(
TRXState
*
s1
,
const
TRXDriverParams
*
params
)
{
TRXEcpriState
*
s
=
s1
->
opaque
;
s
->
sample_rate
=
params
->
sample_rate
[
0
].
num
/
params
->
sample_rate
[
0
].
den
;
start
(
s
);
return
0
;
}
int
trx_driver_init
(
TRXState
*
s1
)
{
TRXEcpriState
*
s
;
double
val
;
// Lock all current and future pages from preventing of being paged to
// swap
if
(
mlockall
(
MCL_CURRENT
|
MCL_FUTURE
))
{
log_error
(
"TRX_ECPRI"
,
"mlockall failed"
);
}
log_info
(
"TRX_ECPRI"
,
"Init"
);
if
(
s1
->
trx_api_version
!=
TRX_API_VERSION
)
{
fprintf
(
stderr
,
"ABI compatibility mismatch between LTEENB and TRX driver (LTEENB ABI version=%d, TRX driver ABI version=%d)
\n
"
,
s1
->
trx_api_version
,
TRX_API_VERSION
);
return
-
1
;
}
s
=
malloc
(
sizeof
(
TRXEcpriState
));
memset
(
s
,
0
,
sizeof
(
*
s
));
trx_get_param_double
(
s1
,
&
val
,
"recv_affinity"
);
s
->
recv_affinity
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"send_affinity"
);
s
->
send_affinity
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"prepare_affinity"
);
s
->
send_affinity
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"decompress_affinity"
);
s
->
send_affinity
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"flow_id"
);
s
->
flow_id
=
(
int
)
val
;
trx_get_param_double
(
s1
,
&
val
,
"ecpri_period"
);
s
->
ecpri_period
=
(
int
)
val
;
s
->
re_mac
=
trx_get_param_string
(
s1
,
"re_mac"
);
s
->
rec_mac
=
trx_get_param_string
(
s1
,
"rec_mac"
);
s
->
rec_if
=
trx_get_param_string
(
s1
,
"rec_if"
);
s1
->
opaque
=
s
;
s1
->
trx_end_func
=
trx_ecpri_end
;
s1
->
trx_write_func2
=
trx_ecpri_write
;
s1
->
trx_read_func2
=
trx_ecpri_read
;
s1
->
trx_start_func
=
trx_ecpri_start
;
s1
->
trx_get_sample_rate_func
=
trx_ecpri_get_sample_rate
;
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment