Commit 5779b45d authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Implement buffering and coalescing of unicast messages.

parent aca69890
...@@ -553,6 +553,7 @@ main(int argc, char **argv) ...@@ -553,6 +553,7 @@ main(int argc, char **argv)
} }
} }
timeval_min(&tv, &update_flush_timeout); timeval_min(&tv, &update_flush_timeout);
timeval_min(&tv, &unicast_flush_timeout);
FD_ZERO(&readfds); FD_ZERO(&readfds);
if(timeval_compare(&tv, &now) > 0) { if(timeval_compare(&tv, &now) > 0) {
timeval_minus(&tv, &tv, &now); timeval_minus(&tv, &tv, &now);
...@@ -678,6 +679,11 @@ main(int argc, char **argv) ...@@ -678,6 +679,11 @@ main(int argc, char **argv)
flushupdates(); flushupdates();
} }
if(unicast_flush_timeout.tv_sec != 0) {
if(timeval_compare(&now, &unicast_flush_timeout) >= 0)
flush_unicast(1);
}
for(i = 0; i < numnets; i++) { for(i = 0; i < numnets; i++) {
if(!nets[i].up) if(!nets[i].up)
continue; continue;
......
...@@ -59,6 +59,12 @@ struct buffered_update buffered_updates[MAX_BUFFERED_UPDATES]; ...@@ -59,6 +59,12 @@ struct buffered_update buffered_updates[MAX_BUFFERED_UPDATES];
struct network *update_net = NULL; struct network *update_net = NULL;
int updates = 0; int updates = 0;
#define UNICAST_BUFSIZE 1024
int unicast_buffered = 0;
unsigned char *unicast_buffer = NULL;
struct neighbour *unicast_neighbour = NULL;
struct timeval unicast_flush_timeout = {0, 0};
unsigned short unsigned short
hash_id(const unsigned char *id) hash_id(const unsigned char *id)
{ {
...@@ -378,7 +384,8 @@ schedule_flush(struct network *net) ...@@ -378,7 +384,8 @@ schedule_flush(struct network *net)
timeval_minus_msec(&net->flush_timeout, &now) < msecs) timeval_minus_msec(&net->flush_timeout, &now) < msecs)
return; return;
net->flush_timeout.tv_usec = (now.tv_usec + msecs * 1000) % 1000000; net->flush_timeout.tv_usec = (now.tv_usec + msecs * 1000) % 1000000;
net->flush_timeout.tv_sec = now.tv_sec + (now.tv_usec / 1000 + msecs) / 1000; net->flush_timeout.tv_sec =
now.tv_sec + (now.tv_usec / 1000 + msecs) / 1000;
} }
void void
...@@ -394,6 +401,22 @@ schedule_flush_now(struct network *net) ...@@ -394,6 +401,22 @@ schedule_flush_now(struct network *net)
now.tv_sec + (now.tv_usec / 1000 + msecs) / 1000; now.tv_sec + (now.tv_usec / 1000 + msecs) / 1000;
} }
static void
schedule_unicast_flush(void)
{
int msecs;
if(!unicast_neighbour)
return;
msecs = jitter(unicast_neighbour->network, 1);
if(unicast_flush_timeout.tv_sec != 0 &&
timeval_minus_msec(&unicast_flush_timeout, &now) < msecs)
return;
unicast_flush_timeout.tv_usec = (now.tv_usec + msecs * 1000) % 1000000;
unicast_flush_timeout.tv_sec =
now.tv_sec + (now.tv_usec / 1000 + msecs) / 1000;
}
static void static void
start_message(struct network *net, int bytes) start_message(struct network *net, int bytes)
{ {
...@@ -537,31 +560,88 @@ send_request_resend(struct neighbour *neigh, ...@@ -537,31 +560,88 @@ send_request_resend(struct neighbour *neigh,
} }
static void void
send_unicast_packet(struct neighbour *neigh, unsigned char *buf, int buflen) flush_unicast(int dofree)
{ {
struct sockaddr_in6 sin6; struct sockaddr_in6 sin6;
int rc; int rc;
if(!neigh->network->up) if(unicast_buffered == 0)
return; goto done;
if(!unicast_neighbour->network->up)
goto done;
if(check_bucket(neigh->network)) { /* Preserve ordering of messages */
flushbuf(unicast_neighbour->network);
if(check_bucket(unicast_neighbour->network)) {
memset(&sin6, 0, sizeof(sin6)); memset(&sin6, 0, sizeof(sin6));
sin6.sin6_family = AF_INET6; sin6.sin6_family = AF_INET6;
memcpy(&sin6.sin6_addr, neigh->address, 16); memcpy(&sin6.sin6_addr, unicast_neighbour->address, 16);
sin6.sin6_port = htons(protocol_port); sin6.sin6_port = htons(protocol_port);
sin6.sin6_scope_id = neigh->network->ifindex; sin6.sin6_scope_id = unicast_neighbour->network->ifindex;
rc = babel_send(protocol_socket, rc = babel_send(protocol_socket,
packet_header, sizeof(packet_header), packet_header, sizeof(packet_header),
buf, buflen, unicast_buffer, unicast_buffered,
(struct sockaddr*)&sin6, sizeof(sin6)); (struct sockaddr*)&sin6, sizeof(sin6));
if(rc < 0) if(rc < 0)
perror("send(unicast)"); perror("send(unicast)");
} else { } else {
fprintf(stderr, "Warning: bucket full, dropping packet to %s.\n", fprintf(stderr,
neigh->network->ifname); "Warning: bucket full, dropping unicast packet"
"to %s (%s) if %s.\n",
format_address(unicast_neighbour->id),
format_address(unicast_neighbour->address),
unicast_neighbour->network->ifname);
} }
done:
VALGRIND_MAKE_MEM_UNDEFINED(unicast_buffer, UNICAST_BUFSIZE);
unicast_buffered = 0;
if(dofree && unicast_buffer) {
free(unicast_buffer);
unicast_buffer = NULL;
}
unicast_neighbour = NULL;
unicast_flush_timeout.tv_sec = 0;
unicast_flush_timeout.tv_usec = 0;
}
static void
send_unicast_message(struct neighbour *neigh,
unsigned char type,
unsigned char plen, unsigned char hop_count,
unsigned short seqno, unsigned short metric,
const unsigned char *address)
{
if(unicast_neighbour) {
if(neigh != unicast_neighbour ||
unicast_buffered + 24 >=
MIN(UNICAST_BUFSIZE, neigh->network->bufsize))
flush_unicast(0);
}
if(!unicast_buffer)
unicast_buffer = malloc(UNICAST_BUFSIZE);
if(!unicast_buffer) {
perror("malloc(unicast_buffer)");
return;
}
assert(unicast_buffered % 24 == 0);
unicast_buffer[unicast_buffered++] = type;
unicast_buffer[unicast_buffered++] = plen;
unicast_buffer[unicast_buffered++] = 0;
unicast_buffer[unicast_buffered++] = hop_count;
unicast_buffer[unicast_buffered++] = (seqno >> 8) & 0xFF;
unicast_buffer[unicast_buffered++] = seqno & 0xFF;
unicast_buffer[unicast_buffered++] = (metric >> 8) & 0xFF;
unicast_buffer[unicast_buffered++] = metric & 0xFF;
memcpy(unicast_buffer + unicast_buffered, address, 16);
unicast_buffered += 16;
schedule_unicast_flush();
} }
void void
...@@ -570,12 +650,9 @@ send_unicast_request(struct neighbour *neigh, ...@@ -570,12 +650,9 @@ send_unicast_request(struct neighbour *neigh,
unsigned char hop_count, unsigned short seqno, unsigned char hop_count, unsigned short seqno,
unsigned short router_hash) unsigned short router_hash)
{ {
unsigned char buf[24];
/* Make sure any buffered updates go out before this request. */ /* Make sure any buffered updates go out before this request. */
if(update_net == neigh->network) if(update_net == neigh->network)
flushupdates(); flushupdates();
flushbuf(neigh->network);
debugf("Sending unicast request to %s (%s) for %s (%d hops).\n", debugf("Sending unicast request to %s (%s) for %s (%d hops).\n",
format_address(neigh->id), format_address(neigh->id),
...@@ -583,20 +660,11 @@ send_unicast_request(struct neighbour *neigh, ...@@ -583,20 +660,11 @@ send_unicast_request(struct neighbour *neigh,
prefix ? format_prefix(prefix, plen) : "any", prefix ? format_prefix(prefix, plen) : "any",
hop_count); hop_count);
buf[0] = 2; if(prefix)
if(prefix) { send_unicast_message(neigh,
buf[1] = plen; 2, plen, hop_count, seqno, router_hash, prefix);
buf[2] = 0; else
buf[3] = hop_count; send_unicast_message(neigh, 2, 0xFF, 0, 0, 0, ones);
*(uint16_t*)(buf + 4) = seqno;
*(uint16_t*)(buf + 6) = router_hash;
memcpy(buf + 8, prefix, 16);
} else {
buf[1] = 0xFF;
memset(buf + 2, 0, 6);
memcpy(buf + 8, ones, 16);
}
send_unicast_packet(neigh, buf, 24);
} }
/* Return the source-id of the last buffered update message. */ /* Return the source-id of the last buffered update message. */
......
...@@ -34,6 +34,9 @@ extern int split_horizon; ...@@ -34,6 +34,9 @@ extern int split_horizon;
extern struct timeval update_flush_timeout; extern struct timeval update_flush_timeout;
extern const unsigned char packet_header[8]; extern const unsigned char packet_header[8];
extern struct neighbour *unicast_neighbour;
extern struct timeval unicast_flush_timeout;
unsigned short hash_id(const unsigned char *id) ATTRIBUTE ((pure)); unsigned short hash_id(const unsigned char *id) ATTRIBUTE ((pure));
void parse_packet(const unsigned char *from, struct network *net, void parse_packet(const unsigned char *from, struct network *net,
const unsigned char *packet, int len); const unsigned char *packet, int len);
...@@ -50,6 +53,7 @@ void send_request(struct network *net, ...@@ -50,6 +53,7 @@ void send_request(struct network *net,
void send_request_resend(struct neighbour *neigh, void send_request_resend(struct neighbour *neigh,
const unsigned char *prefix, unsigned char plen, const unsigned char *prefix, unsigned char plen,
unsigned short seqno, unsigned short router_hash); unsigned short seqno, unsigned short router_hash);
void flush_unicast(int dofree);
void send_unicast_request(struct neighbour *neigh, void send_unicast_request(struct neighbour *neigh,
const unsigned char *prefix, unsigned char plen, const unsigned char *prefix, unsigned char plen,
unsigned char hop_count, unsigned short seqno, unsigned char hop_count, unsigned short seqno,
......
...@@ -47,6 +47,8 @@ void ...@@ -47,6 +47,8 @@ void
flush_neighbour(struct neighbour *neigh) flush_neighbour(struct neighbour *neigh)
{ {
flush_neighbour_routes(neigh); flush_neighbour_routes(neigh);
if(unicast_neighbour == neigh)
flush_unicast(1);
memset(neigh, 0, sizeof(*neigh)); memset(neigh, 0, sizeof(*neigh));
VALGRIND_MAKE_MEM_UNDEFINED(neigh, sizeof(*neigh)); VALGRIND_MAKE_MEM_UNDEFINED(neigh, sizeof(*neigh));
neigh->hello_seqno = -2; neigh->hello_seqno = -2;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment