Commit e642aa94 authored by Joanne Hugé's avatar Joanne Hugé

Have the server receive packets in a real time thread

parent 32e46196
...@@ -38,14 +38,11 @@ static void process_options(int argc, char *argv[], thread_param_t *param, ...@@ -38,14 +38,11 @@ static void process_options(int argc, char *argv[], thread_param_t *param,
static void *packet_sending_thread(void *p) { static void *packet_sending_thread(void *p) {
struct timespec next; struct timespec next;
struct sched_param priority; struct sched_param priority;
thread_param_t *param = (thread_param_t *)p; thread_param_t *param = (thread_param_t *)p;
priority.sched_priority = param->priority; priority.sched_priority = param->priority;
if (sched_setscheduler(0, SCHED_FIFO, &priority))
int err = sched_setscheduler(0, SCHED_FIFO, &priority); error(EXIT_FAILURE, errno, "Couldn't set priority");
if (err) error(EXIT_FAILURE, errno, "Couldn't set priority");
clock_gettime(CLOCK_ID, &next); clock_gettime(CLOCK_ID, &next);
...@@ -57,7 +54,7 @@ static void *packet_sending_thread(void *p) { ...@@ -57,7 +54,7 @@ static void *packet_sending_thread(void *p) {
next.tv_nsec += param->interval; next.tv_nsec += param->interval;
if ( (unsigned int)next.tv_nsec >= NSEC_PER_SEC) { if ((unsigned int)next.tv_nsec >= NSEC_PER_SEC) {
next.tv_sec += 1; next.tv_sec += 1;
next.tv_nsec -= NSEC_PER_SEC; next.tv_nsec -= NSEC_PER_SEC;
} }
...@@ -93,12 +90,12 @@ int main(int argc, char *argv[]) { ...@@ -93,12 +90,12 @@ int main(int argc, char *argv[]) {
for (;;) { for (;;) {
usleep(main_param.refresh_rate); usleep(main_param.refresh_rate);
#ifdef DEBUG_ENABLE #ifdef DEBUG_ENABLE
printf("Nb cycles: %d\n", param.stats.nb_cycles); printf("Nb cycles: %d\n", param.stats.nb_cycles);
#endif #endif
if(param.max_cycles) if (param.max_cycles)
if(param.max_cycles == param.stats.nb_cycles) break; if (param.max_cycles == param.stats.nb_cycles) break;
} }
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
......
#include <arpa/inet.h> #include <arpa/inet.h>
#include <errno.h> #include <errno.h>
#include <error.h>
#include <netdb.h> #include <netdb.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <pthread.h>
#include <sched.h>
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -16,30 +19,131 @@ ...@@ -16,30 +19,131 @@
#define SERVER_PORT "50000" #define SERVER_PORT "50000"
#define BUFFER_SIZE 1024 #define BUFFER_SIZE 1024
typedef struct thread_stat {
uint64_t min_interval;
uint64_t max_interval;
int packets_received;
} thread_stat_t;
typedef struct thread_param {
int priority;
thread_stat_t stats;
int sockfd;
} thread_param_t;
typedef struct main_param {
int refresh_rate;
} main_param_t;
static inline uint64_t calcdiff_ns(struct timespec t1, struct timespec t2); static inline uint64_t calcdiff_ns(struct timespec t1, struct timespec t2);
static inline uint64_t max(uint64_t a, uint64_t b); static inline uint64_t max(uint64_t a, uint64_t b);
static inline uint64_t min(uint64_t a, uint64_t b); static inline uint64_t min(uint64_t a, uint64_t b);
static void *get_in_addr(struct sockaddr *sa) { static void process_options(int argc, char *argv[], thread_param_t *param,
if (sa->sa_family == AF_INET) return &(((struct sockaddr_in *)sa)->sin_addr); main_param_t *main_param);
return &(((struct sockaddr_in6 *)sa)->sin6_addr); void init_server(thread_param_t *param);
}
int main() { // Real-time thread
int status; // Measures intervals between packet receptions
static void *packet_receiving_thread(void *p) {
struct timespec current, previous; struct timespec current, previous;
uint64_t diff; struct sched_param priority;
thread_param_t *param = (thread_param_t *)p;
int sockfd = 0; thread_stat_t *stats = &param->stats;
struct addrinfo hints, *servinfo, *servinfo_it; uint64_t diff = 0;
char buf[BUFFER_SIZE];
int bytes_received = 0; int bytes_received = 0;
struct sockaddr_storage client_addr; struct sockaddr_storage client_addr;
socklen_t addr_len; socklen_t addr_len;
char client_addr_str[INET6_ADDRSTRLEN];
char buf[BUFFER_SIZE]; addr_len = sizeof client_addr;
stats->min_interval = UINT64_MAX;
stats->max_interval = 0;
priority.sched_priority = param->priority;
if (sched_setscheduler(0, SCHED_FIFO, &priority))
error(EXIT_FAILURE, errno, "Couldn't set priority");
for (stats->packets_received = 0;; stats->packets_received++) {
bytes_received = recvfrom(param->sockfd, buf, BUFFER_SIZE - 1, 0,
(struct sockaddr *)&client_addr, &addr_len);
clock_gettime(CLOCK_MONOTONIC, &current);
if (bytes_received == -1)
error(EXIT_FAILURE, errno, "Error while attempting to receive packets");
if (stats->packets_received) {
diff = calcdiff_ns(current, previous);
if (diff < stats->min_interval) stats->min_interval = diff;
if (diff >= stats->max_interval) stats->max_interval = diff;
}
previous = current;
}
return NULL;
}
// Main thread, has non-real time priority
// Handles the IO and creates real time threads
int main(int argc, char *argv[]) {
pthread_t thread;
thread_param_t param;
main_param_t main_param;
// Default values
param.priority = 99;
main_param.refresh_rate = 50000;
// Process bash options
process_options(argc, argv, &param, &main_param);
init_server(&param);
usleep(10000);
if (pthread_create(&thread, NULL, packet_receiving_thread, (void *)&param))
error(EXIT_FAILURE, errno, "Couldn't create thread");
for (;;) {
usleep(main_param.refresh_rate);
printf("%ld - %ld, %ld (%d)\n",
param.stats.max_interval - param.stats.min_interval,
param.stats.min_interval, param.stats.max_interval,
param.stats.packets_received);
}
exit(EXIT_SUCCESS);
}
static void process_options(int argc, char *argv[], thread_param_t *param,
main_param_t *main_param) {
for (;;) {
int c = getopt(argc, argv, "p:r:");
if (c == -1) break;
switch (c) {
case 'p':
param->priority = atoi(optarg);
break;
case 'r':
main_param->refresh_rate = atoi(optarg);
break;
default:
exit(EXIT_FAILURE);
break;
}
}
}
void init_server(thread_param_t *param) {
int status;
struct addrinfo hints, *servinfo, *servinfo_it;
memset(&hints, 0, sizeof hints); memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC; hints.ai_family = AF_UNSPEC;
...@@ -49,22 +153,17 @@ int main() { ...@@ -49,22 +153,17 @@ int main() {
status = getaddrinfo(NULL, SERVER_PORT, &hints, &servinfo); status = getaddrinfo(NULL, SERVER_PORT, &hints, &servinfo);
if (status != 0) { if (status != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status)); fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(status));
printf("getaddrinfo: %s\n", gai_strerror(status)); exit(EXIT_FAILURE);
return 1;
} }
for (servinfo_it = servinfo; servinfo_it; for (servinfo_it = servinfo; servinfo_it;
servinfo_it = servinfo_it->ai_next) { servinfo_it = servinfo_it->ai_next) {
sockfd = socket(servinfo->ai_family, servinfo->ai_socktype, param->sockfd = socket(servinfo->ai_family, servinfo->ai_socktype,
servinfo->ai_protocol); servinfo->ai_protocol);
if (sockfd == -1) {
printf("Socket error, continuing...\n");
continue;
}
if (bind(sockfd, servinfo_it->ai_addr, servinfo_it->ai_addrlen) == -1) { if (bind(param->sockfd, servinfo_it->ai_addr, servinfo_it->ai_addrlen) ==
close(sockfd); -1) {
printf("Bind error, continuing...\n"); close(param->sockfd);
continue; continue;
} }
break; break;
...@@ -72,35 +171,7 @@ int main() { ...@@ -72,35 +171,7 @@ int main() {
freeaddrinfo(servinfo); freeaddrinfo(servinfo);
addr_len = sizeof client_addr; printf("waiting to receive...\n");
printf("waiting to recvfrom...\n");
while (1) {
if ((bytes_received =
recvfrom(sockfd, buf, BUFFER_SIZE - 1, 0,
(struct sockaddr *)&client_addr, &addr_len)) == -1) {
printf("recvfrom error\n");
return 2;
}
clock_gettime(CLOCK_MONOTONIC, &current);
inet_ntop(client_addr.ss_family,
get_in_addr((struct sockaddr *)&client_addr), client_addr_str,
sizeof(client_addr_str));
buf[bytes_received] = '\0';
diff = calcdiff_ns(current, previous);
printf("%lld: got packet from %s: %s (%d long)\n", diff, client_addr_str,
buf, bytes_received);
previous = current;
}
close(sockfd);
return 0;
} }
static inline uint64_t calcdiff_ns(struct timespec t1, struct timespec t2) { static inline uint64_t calcdiff_ns(struct timespec t1, struct timespec t2) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment