Commit b63c15af authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Make the flooding protocol use TCP.

This simplifies the code a lot, since it gives us the notion of a session
for free.  And congestion control, of course.

This is not quite ready yet: we don't wait for the peer's handshake before
initiating flooding, and we don't perform duplicate suppression.  Doing that
will require a different data structure.
parent d818237e
......@@ -18,30 +18,17 @@
#include "flood.h"
#include "util.h"
int flood_socket = -1;
int flood_port = 4444;
int server_socket = -1;
int server_port = -1;
struct datum **data = NULL;
int numdata = 0, maxdata = 0;
struct timespec flood_time = {0, 0};
static void
schedule_flood()
{
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
ts_add_msec(&flood_time, &now, 1);
}
static void (*datum_callback)(struct datum *, int) = NULL;
static int buffer_update(struct neighbour *neigh,
const unsigned char *key, int keylen, int acked);
static int record_unacked(struct neighbour *neigh,
const unsigned char *key, int keylen);
static int flush_unacked(struct neighbour *neigh,
const unsigned char *key, int keylen);
static int parse_tlv(struct neighbour *neigh);
static int handshake(struct neighbour *neigh);
static int dump_data(struct neighbour *neigh);
static int
seqno_compare(unsigned short s1, unsigned short s2)
......@@ -222,65 +209,137 @@ extend_datum(struct datum *datum, time_t extend)
return 0;
}
struct neighbour *neighbours = NULL;
int numneighbours = 0, maxneighbours = 0;
struct neighbour *neighs = NULL;
int numneighs = 0, maxneighs = 0;
static int send_dump_request(struct neighbour *neigh);
static int send_dump_reply(struct neighbour *neigh);
struct neighbour *
find_neighbour(int fd)
{
for(int i = 0; i < numneighs; i++) {
if(neighs[i].fd == fd)
return &neighs[i];
}
return NULL;
}
int
flood_setup(void (*callback)(struct datum *, int))
struct neighbour *
create_neighbour()
{
struct sockaddr_in6 sin6;
int s, rc, saved_errno;
int zero = 0, one = 1;
if(maxneighs <= numneighs) {
int n = maxneighs == 0 ? 8 : 2 * maxneighs;
struct neighbour *newneighs =
realloc(neighs, n * sizeof(struct neighbour));
if(newneighs != NULL) {
neighs = newneighs;
maxneighs = n;
}
}
if(maxneighs <= numneighs)
return NULL;
s = socket(PF_INET6, SOCK_DGRAM, 0);
if(s < 0)
return -1;
memset(&neighs[numneighs], 0, sizeof(struct neighbour));
neighs[numneighs].fd = -1;
numneighs++;
return &neighs[numneighs - 1];
}
rc = setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero));
if(rc < 0)
goto fail;
void
flush_neighbour(struct neighbour *neigh)
{
int i = neigh - neighs;
assert(i >= 0 && i < numneighs);
rc = fcntl(s, F_GETFL, 0);
if(rc < 0)
goto fail;
if(neigh->fd >= 0) {
close(neigh->fd);
neigh->fd = -1;
}
if(neigh->sin6 != NULL) {
free(neigh->sin6);
neigh->sin6 = NULL;
}
rc = fcntl(s, F_SETFL, (rc | O_NONBLOCK));
if(i < numneighs - 1)
memmove(neighs + i, neighs + i + 1,
(numneighs - i - 1) * sizeof(struct neighbour));
numneighs--;
}
static int
setup_socket(int fd)
{
int rc, saved_errno;
int zero = 0;
if(fd < 0) {
fd = socket(PF_INET6, SOCK_STREAM, 0);
if(fd < 0)
return -1;
rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero));
if(rc < 0)
goto fail;
}
rc = fcntl(fd, F_GETFL, 0);
if(rc < 0)
goto fail;
rc = fcntl(s, F_GETFD, 0);
rc = fcntl(fd, F_SETFL, (rc | O_NONBLOCK));
if(rc < 0)
goto fail;
rc = fcntl(s, F_SETFD, rc | FD_CLOEXEC);
rc = fcntl(fd, F_GETFD, 0);
if(rc < 0)
goto fail;
rc = setsockopt(s, IPPROTO_IPV6, IPV6_RECVPKTINFO, &one, sizeof(one));
rc = fcntl(fd, F_SETFD, rc | FD_CLOEXEC);
if(rc < 0)
goto fail;
return fd;
fail:
saved_errno = errno;
close(fd);
errno = saved_errno;
return -1;
}
int
flood_setup(void (*callback)(struct datum *, int))
{
struct sockaddr_in6 sin6;
int fd, rc, saved_errno;
datum_callback = callback;
if(server_port < 0)
return 0;
fd = setup_socket(-1);
if(fd < 0)
return -1;
memset(&sin6, 0, sizeof(sin6));
sin6.sin6_family = AF_INET6;
sin6.sin6_port = htons(flood_port);
rc = bind(s, (struct sockaddr*)&sin6, sizeof(sin6));
sin6.sin6_port = htons(server_port);
rc = bind(fd, (struct sockaddr*)&sin6, sizeof(sin6));
if(rc < 0)
goto fail;
flood_socket = s;
rc = listen(fd, 1024);
if(rc < 0)
goto fail;
datum_callback = callback;
server_socket = fd;
periodic_flood();
return 1;
fail:
saved_errno = errno;
close(s);
close(fd);
errno = saved_errno;
return -1;
}
......@@ -288,561 +347,408 @@ flood_setup(void (*callback)(struct datum *, int))
void
flood_cleanup()
{
close(flood_socket);
flood_socket = -1;
}
static void
commit_neighbour(struct neighbour *neigh, int update, int permanent)
{
if(update) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
neigh->time = now.tv_sec;
for(int i = 0; i < numneighs; i++) {
if(neighs[i].fd >= 0) {
close(neighs[i].fd);
neighs[i].fd = -1;
}
}
if(server_socket >= 0) {
close(server_socket);
server_socket = -1;
}
if(permanent)
neigh->permanent = 1;
}
static int
match(const struct sockaddr_in6 *a, const struct sockaddr_in6 *b)
int
flood_accept()
{
return a->sin6_port == b->sin6_port &&
memcmp(&a->sin6_addr, &b->sin6_addr, 16) == 0;
}
int fd, rc;
struct neighbour *neigh;
struct neighbour *
find_neighbour(struct sockaddr_in6 *sin6, int create, int update, int permanent)
{
for(int i = 0; i < numneighbours; i++) {
if(match(sin6, &neighbours[i].addr)) {
commit_neighbour(&neighbours[i], update, permanent);
return &neighbours[i];
}
fd = accept(server_socket, NULL, NULL);
if(fd < 0) {
if(errno != EAGAIN)
perror("accept");
return 0;
}
if(!create)
return NULL;
rc = setup_socket(fd);
if(rc < 0) {
perror("setup_socket(accept)");
close(fd);
return -1;
}
if(maxneighbours <= numneighbours) {
int n = maxneighbours == 0 ? 8 : 2 * maxneighbours;
struct neighbour *newneighbours =
realloc(neighbours, n * sizeof(struct neighbour));
if(newneighbours != NULL) {
neighbours = newneighbours;
maxneighbours = n;
}
neigh = create_neighbour();
if(neigh == NULL) {
close(fd);
return -1;
}
if(maxneighbours <= numneighbours)
return NULL;
memset(&neighbours[numneighbours], 0, sizeof(struct neighbour));
memcpy(&neighbours[numneighbours].addr, sin6, sizeof(struct sockaddr_in6));
neighbours[numneighbours].dump_done = 0;
commit_neighbour(&neighbours[numneighbours], update, permanent);
numneighbours++;
return &neighbours[numneighbours-1];
neigh->fd = fd;
rc = handshake(neigh);
if(rc < 0) {
close(neigh->fd);
neigh->fd = -1;
}
return 1;
}
void
flush_neighbour(struct neighbour *neigh)
static int
flood_reconnect(struct neighbour *neigh)
{
int i = neigh - neighbours;
assert(i >= 0 && i < numneighbours);
free(neighbours[i].unacked);
neighbours[i].unacked = NULL;
free(neighbours[i].pktinfo);
neighbours[i].pktinfo = NULL;
if(i < numneighbours - 1)
memmove(neighbours + i, neighbours + i + 1,
(numneighbours - i - 1) * sizeof(struct neighbour));
numneighbours--;
}
int fd, rc;
static void
parse_packet(struct sockaddr_in6 *from, struct in6_pktinfo *info,
const unsigned char *packet, int packetlen)
{
struct neighbour *neigh;
unsigned int bodylen;
int i;
fd = setup_socket(-1);
if(fd < 0) {
return -1;
}
if(packetlen < 4)
return;
rc = connect(fd, (struct sockaddr*)neigh->sin6, sizeof(struct sockaddr_in6));
if(rc < 0 && errno != EINPROGRESS) {
perror("connect");
close(fd);
/* let the connect loop recover */
return 0;
}
if(packet[0] != 44 || packet[1] != 0)
return;
neigh->fd = fd;
rc = handshake(neigh);
if(rc < 0) {
close(neigh->fd);
neigh->fd = -1;
return -1;
}
return 1;
}
DO_NTOHS(bodylen, packet + 2);
int
flood_connect(const struct sockaddr_in6 *sin6)
{
struct neighbour *neigh;
neigh = create_neighbour();
if(neigh == NULL)
return -1;
if(bodylen + 4 > packetlen) {
fprintf(stderr, "Received truncated packet.\n");
return;
if(neigh->sin6 == NULL)
neigh->sin6 = malloc(sizeof(struct sockaddr_in6));
if(neigh->sin6 == NULL) {
flush_neighbour(neigh);
return -1;
}
memcpy(neigh->sin6, sin6, sizeof(struct sockaddr_in6));
neigh = find_neighbour(from, 1, 1, 0);
if(neigh == NULL)
return;
return flood_reconnect(neigh);
}
if(info != NULL) {
if(neigh->pktinfo != NULL) {
if(memcmp(neigh->pktinfo, info, sizeof(struct in6_pktinfo)) != 0) {
free(neigh->pktinfo);
neigh->pktinfo = NULL;
}
int
flood_read(struct neighbour *neigh)
{
int rc;
if(neigh->in.cap == 0) {
neigh->in.buf = malloc(4096);
if(neigh->in.buf == NULL) {
close(neigh->fd);
neigh->fd = -1;
return -1;
}
if(neigh->pktinfo == NULL)
neigh->pktinfo = malloc(sizeof(struct in6_pktinfo));
if(neigh->pktinfo != NULL)
memcpy(neigh->pktinfo, info, sizeof(struct in6_pktinfo));
neigh->in.cap = 4096;
}
i = 0;
while(i < bodylen) {
const unsigned char *tlv = packet + 4 + i;
int len;
if(tlv[0] == 0) {
i++;
continue;
}
if(i + 1 > bodylen)
return;
len = tlv[1];
if(i + len + 2 > bodylen)
return;
if(neigh->in.len >= neigh->in.cap) {
fprintf(stderr, "Read buffer overflow.\n");
close(neigh->fd);
neigh->fd = -1;
return -1;
}
switch(tlv[0]) {
case 1:
debugf("<- PAD1\n");
break;
case 2: {
struct datum *datum;
unsigned char keylen;
unsigned short seqno;
unsigned int time;
int ack, doit, conflict;
if(len < 2) {
debugf("Truncated DATUM.\n");
goto skip;
}
ack = !!(tlv[2] & 0x80);
DO_NTOHS(seqno, tlv + 3);
DO_NTOHL(time, tlv + 5);
keylen = tlv[9];
if(len < keylen + 8) {
debugf("Truncated DATUM.\n");
goto skip;
}
debugf("<- DATUM %d (%d) %ld%s\n",
keylen <= 0 ? -1 : (int)tlv[10], keylen, (long)time,
ack ? " (ack)" : "");
datum = find_datum(tlv+10, keylen);
if(datum != NULL && seqno >= datum->seqno) {
flush_unacked(neigh, tlv+10, keylen);
}
datum = update_datum(tlv + 10, keylen, seqno,
tlv + 10 + keylen, len - keylen - 8,
time, &doit, &conflict);
if(doit && datum_callback != NULL)
datum_callback(datum, conflict);
if(doit || ack)
flood(datum, neigh, ack, doit);
}
break;
case 3:
debugf("<- DUMP\n");
send_dump_reply(neigh);
for(int i = 0; i < numneighbours; i++) {
for(int j = 0; j < numdata; j++)
record_unacked(&neighbours[i],
datum_key(data[j]), data[j]->keylen);
}
schedule_flood();
break;
case 4:
debugf("<- DUMP-ACK\n");
neigh->dump_done = 1;
break;
default:
debugf("Unknown TLV %d\n", tlv[0]);
rc = read(neigh->fd,
neigh->in.buf + neigh->in.len,
neigh->in.cap - neigh->in.len);
if(rc <= 0) {
if(errno == EAGAIN)
return 0;
if(rc < 0)
perror("read");
close(neigh->fd);
neigh->fd = -1;
}
neigh->in.len += rc;
while(neigh->in.len > 0) {
rc = parse_tlv(neigh);
if(rc < 0) {
close(neigh->fd);
neigh->fd = -1;
return -1;
}
if(rc == 0)
return 1;
skip:
i += 2 + len;
memmove(neigh->in.buf, neigh->in.buf + rc,
neigh->in.len - rc);
neigh->in.len -= rc;
}
flush_updates(neigh, 1);
return 1;
}
int
flood_listen(void)
flood_write(struct neighbour *neigh)
{
struct sockaddr_in6 from;
struct in6_pktinfo *info;
unsigned char buf[4096];
struct iovec iov[1];
struct msghdr msg;
int cmsglen = 100;
char cmsgbuf[cmsglen];
struct cmsghdr *cmsg = (struct cmsghdr*)cmsgbuf;
int rc;
iov[0].iov_base = buf;
iov[0].iov_len = 4096;
memset(&msg, 0, sizeof(msg));
msg.msg_name = &from;
msg.msg_namelen = sizeof(from);
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = cmsg;
msg.msg_controllen = cmsglen;
rc = recvmsg(flood_socket, &msg, 0);
if(rc < 0)
return rc;
if(neigh->fd < 0) {
fprintf(stderr, "flood_write called for dead neighbour!\n");
return -1;
}
info = NULL;
cmsg = CMSG_FIRSTHDR(&msg);
while(cmsg != NULL) {
if ((cmsg->cmsg_level == IPPROTO_IPV6) &&
(cmsg->cmsg_type == IPV6_PKTINFO)) {
info = (struct in6_pktinfo*)CMSG_DATA(cmsg);
break;
}
cmsg = CMSG_NXTHDR(&msg, cmsg);
if(neigh->out.len <= 0) {
fprintf(stderr, "flood_write called but nothing to do!\n");
return 0;
}
if(info == NULL) {
errno = EINVAL;
rc = write(neigh->fd, neigh->out.buf, neigh->out.len);
if(rc < 0) {
if(errno == EAGAIN)
return 0;
close(neigh->fd);
neigh->fd = -1;
return -1;
}
parse_packet(&from, info, buf, rc);
if(rc < neigh->out.len) {
memmove(neigh->out.buf, neigh->out.buf + rc, neigh->out.len - rc);
neigh->out.len -= rc;
} else {
neigh->out.len = 0;
}
return 1;
}
static int
send_neighbour(struct neighbour *neigh, unsigned char *buf, int buflen)
parse_tlv(struct neighbour *neigh)
{
struct timespec now;
struct msghdr msg;
struct iovec iov[1];
struct cmsghdr *cmsg;
union {
struct cmsghdr hdr;
char buf[CMSG_SPACE(sizeof(struct in6_pktinfo))];
} u;
clock_gettime(CLOCK_MONOTONIC, &now);
neigh->send_time = now.tv_sec;
iov[0].iov_base = buf;
iov[0].iov_len = buflen;
memset(&msg, 0, sizeof(msg));
msg.msg_name = (struct sockaddr*)&neigh->addr;
msg.msg_namelen = sizeof(neigh->addr);
msg.msg_iov = iov;
msg.msg_iovlen = 1;
if(neigh->pktinfo != NULL) {
memset(u.buf, 0, sizeof(u.buf));
msg.msg_control = u.buf;
msg.msg_controllen = CMSG_SPACE(sizeof(struct in6_pktinfo));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = IPPROTO_IPV6;
cmsg->cmsg_type = IPV6_PKTINFO;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct in6_pktinfo));
memcpy(CMSG_DATA(cmsg), neigh->pktinfo, sizeof(struct in6_pktinfo));
}
return sendmsg(flood_socket, &msg, 0);
int tpe, len;
unsigned char *body;
if(!neigh->handshake_received) {
if(neigh->in.len < 4)
return 0;
if(neigh->in.buf[0] != 44 ||
neigh->in.buf[1] != 1)
return -1;
debugf("<- Handshake\n");
neigh->handshake_received = 1;
if(!neigh->dump_sent) {
int rc = dump_data(neigh);
if(rc < 0)
return rc;
}
return 4;
}
}
if(neigh->in.len < 1)
return 0;
static int
send_dump_request(struct neighbour *neigh)
{
unsigned char buf[6] = {44, 0, 0, 2, 3, 0};
debugf("-> DUMP\n");
return send_neighbour(neigh, buf, 6);
}
if(neigh->in.buf[0] == 0) {
debugf("<- PAD1\n");
return 1;
}
static int
send_dump_reply(struct neighbour *neigh)
{
unsigned char buf[6] = {44, 0, 0, 2, 4, 0};
debugf("-> DUMP-ACK\n");
return send_neighbour(neigh, buf, 6);
}
if(neigh->in.len < 2)
return 0;
int
flush_updates(struct neighbour *neigh, int all)
{
unsigned char buf[1024] = {44, 0, 0, 0};
struct timespec now;
time_t time;
int i, n, rc = 0;
tpe = neigh->in.buf[0];
len = neigh->in.buf[1];
if(neigh->numbuffered == 0)
if(neigh->in.len < len + 2)
return 0;
clock_gettime(CLOCK_MONOTONIC, &now);
body = neigh->in.buf + 2;
i = 0;
for(n = 0; n < neigh->numbuffered; n++) {
switch(tpe) {
case 1:
debugf("<- PADN\n");
break;
case 2: {
struct datum *datum;
if(i == 0) {
buf[i] = 44; i++;
buf[i] = 0; i++;
buf[i] = 0; i++;
buf[i] = 0; i++;
unsigned char keylen;
unsigned short seqno;
unsigned int time;
int doit, conflict;
if(len < 7) {
debugf("Truncated Update.\n");
return -1;
}
datum = find_datum(neigh->buffered[n].key, neigh->buffered[n].keylen);
free(neigh->buffered[n].key);
neigh->buffered[n].key = NULL;
if(datum == NULL)
continue;
time = datum->time - now.tv_sec;
if(time <= 0)
continue;
if(time > 0xFFFFFFFF)
time = 0xFFFFFFFF;
buf[i++] = 2;
buf[i++] = 1 + 2 + 4 + 1 + datum->keylen + datum->vallen;
buf[i++] = neigh->buffered[n].acked ? 0x80 : 0;
DO_HTONS(buf + i, datum->seqno); i += 2;
DO_HTONL(buf + i, time); i += 4;
buf[i++] = datum->keylen;
memcpy(buf + i, datum->datum, datum->keylen + datum->vallen);
i += datum->keylen + datum->vallen;
debugf("-> DATUM %d (%d) %ld%s\n",
datum->keylen <= 0 ? -1 : datum->datum[0], datum->keylen,
(long)time, neigh->buffered[n].acked ? " (ack)" : "");
if(i >= 1024 - 32) {
if(!all)
break;
DO_HTONS(buf + 2, i - 4);
rc = send_neighbour(neigh, buf, i);
DO_NTOHS(seqno, body);
DO_NTOHL(time, body + 2);
keylen = body[6];
if(len < keylen + 7) {
debugf("Truncated Update.\n");
return -1;
}
debugf("<- Update %d (%d) %ld\n",
keylen <= 0 ? -1 : (int)body[7], keylen, (long)time);
datum = find_datum(body + 7, keylen);
datum = update_datum(body + 7, keylen, seqno,
body + 7 + keylen, len - keylen - 7,
time, &doit, &conflict);
if(doit) {
if(datum_callback != NULL)
datum_callback(datum, conflict);
flood(datum, neigh);
}
}
if(i > 0) {
DO_HTONS(buf + 2, i - 4);
rc = send_neighbour(neigh, buf, i);
}
if(n < neigh->numbuffered) {
memmove(neigh->buffered, neigh->buffered + n,
(neigh->numbuffered - n) * sizeof(struct buffered));
neigh->numbuffered -= n;
} else {
neigh->numbuffered = 0;
break;
default:
debugf("Unknown TLV %d\n", tpe);
}
return rc;
return 2 + len;
}
static int
buffer_update(struct neighbour *neigh, const unsigned char *key, int keylen,
int acked)
expand_buffer(struct buffer *buf, int len)
{
unsigned char *newkey;
if(neigh->buffered == NULL) {
neigh->buffered = malloc(MAXBUFFERED * sizeof(struct buffered));
if(neigh->buffered == NULL)
return -1;
}
if(neigh->numbuffered >= MAXBUFFERED)
flush_updates(neigh, 0);
assert(neigh->numbuffered < MAXBUFFERED);
int cap;
unsigned char *b;
newkey = malloc(keylen);
if(newkey == NULL)
return -1;
memcpy(newkey, key, keylen);
if(buf->cap - buf->len >= len)
return 0;
neigh->buffered[neigh->numbuffered].key = newkey;
neigh->buffered[neigh->numbuffered].keylen = keylen;
neigh->buffered[neigh->numbuffered].acked = acked;
neigh->numbuffered++;
cap = buf->len + len;
if(cap < buf->cap * 2)
cap = buf->cap * 2;
b = malloc(cap);
if(b == NULL)
return -1;
memcpy(b, buf->buf, buf->len);
free(buf->buf);
buf->buf = b;
buf->cap = cap;
return 1;
}
static const unsigned char hs[4] = {44, 1, 0, 0};
static int
send_keepalive(struct neighbour *neigh)
buffer_handshake(struct neighbour *neigh)
{
unsigned char buf[4] = {44, 0, 0, 0};
debugf("-> Keepalive\n");
return send_neighbour(neigh, buf, 4);
int rc;
rc = expand_buffer(&neigh->out, 4);
if(rc < 0)
return -1;
memcpy(neigh->out.buf + neigh->out.len, hs, 4);
neigh->out.len += 4;
debugf("-> Handshake\n");
return 1;
}
static int
neighbour_alive(struct neighbour *neigh, time_t now)
buffer_tlv(struct neighbour *neigh, int tpe, int len, unsigned char *body)
{
if(neigh->permanent)
return 1;
return neigh->time > now - 240;
int rc;
rc = expand_buffer(&neigh->out, 2 + len);
if(rc < 0)
return -1;
neigh->out.buf[neigh->out.len++] = tpe;
neigh->out.buf[neigh->out.len++] = len;
memcpy(neigh->out.buf + neigh->out.len, body, len);
neigh->out.len += len;
return 1;
}
void
flood(struct datum *datum, struct neighbour *neigh, int ack, int doit)
static int
buffer_update(struct neighbour *neigh, struct datum *datum)
{
int len = 2 + 4 + 1 + datum->keylen + datum->vallen;
unsigned char body[len];
struct timespec now;
if(ack && neigh != NULL)
buffer_update(neigh, datum_key(datum), datum->keylen, 0);
time_t time;
clock_gettime(CLOCK_MONOTONIC, &now);
if(doit) {
for(int i = 0; i < numneighbours; i++) {
if(neighbour_alive(&neighbours[i], now.tv_sec) &&
&neighbours[i] != neigh)
record_unacked(&neighbours[i], datum_key(datum), datum->keylen);
}
}
schedule_flood();
}
static struct unacked *
find_unacked(struct neighbour *neigh, const unsigned char *key, int keylen)
{
for(int i = 0; i < neigh->numunacked; i++) {
if(neigh->unacked[i].keylen == keylen &&
memcmp(neigh->unacked[i].key, key, keylen) == 0)
return &neigh->unacked[i];
}
return NULL;
time = datum->time - now.tv_sec;
if(time < 0)
time = 0;
if(time > 0xFFFFFFFF)
time = 0xFFFFFFFF;
DO_HTONS(body, datum->seqno);
DO_HTONL(body + 2, (unsigned int)time);
body[6] = datum->keylen;
memcpy(body + 7, datum->datum, datum->keylen);
memcpy(body + 7 + datum->keylen, datum->datum + datum->keylen,
datum->vallen);
debugf("-> Update\n");
return buffer_tlv(neigh, 2, len, body);
}
static int
record_unacked(struct neighbour *neigh, const unsigned char *key, int keylen)
dump_data(struct neighbour *neigh)
{
struct unacked *unacked;
struct timespec now;
unsigned char *newkey;
clock_gettime(CLOCK_MONOTONIC, &now);
unacked = find_unacked(neigh, key, keylen);
if(unacked != NULL) {
unacked->count = 0;
unacked->time = now.tv_sec;
return 0;
}
if(neigh->numunacked >= neigh->maxunacked) {
struct unacked *n;
int count = neigh->maxunacked * 3 / 2;
if(count < 8)
count = 8;
n = realloc(neigh->unacked, count * sizeof(struct unacked));
if(n == NULL)
return -1;
neigh->unacked = n;
neigh->maxunacked = count;
for(int i = 0; i < numdata; i++) {
int rc = buffer_update(neigh, data[i]);
if(rc < 0)
return rc;
}
newkey = malloc(keylen);
if(newkey == NULL)
return -1;
memcpy(newkey, key, keylen);
neigh->unacked[neigh->numunacked].count = 0;
neigh->unacked[neigh->numunacked].key = newkey;
neigh->unacked[neigh->numunacked].keylen = keylen;
neigh->unacked[neigh->numunacked].time = now.tv_sec;
neigh->numunacked++;
schedule_flood();
neigh->dump_sent = 1;
return 1;
}
static int
flush_unacked(struct neighbour *neigh, const unsigned char *key, int keylen)
handshake(struct neighbour *neigh)
{
int i;
struct unacked *unacked;
int rc;
if(neigh->fd < 0)
return -1;
unacked = find_unacked(neigh, key, keylen);
if(unacked == NULL)
return 0;
i = unacked - neigh->unacked;
assert(i >= 0 && i < neigh->numunacked);
rc = buffer_handshake(neigh);
if(rc < 0)
return rc;
free(neigh->unacked[i].key);
neigh->unacked[i].key = NULL;
if(neigh->handshake_received) {
rc = dump_data(neigh);
if(rc < 0)
return rc;
}
if(i < neigh->numunacked - 1)
memmove(neigh->unacked + i, neigh->unacked + i + 1,
(neigh->numunacked - i - 1) * sizeof(struct unacked));
neigh->numunacked--;
return 1;
}
static struct timespec expire_neighbours_time = {0, 0};
static void
expire_neighbours()
void
flood(struct datum *datum, struct neighbour *neigh)
{
struct timespec now;
int i;
clock_gettime(CLOCK_MONOTONIC, &now);
i = 0;
while(i < numneighbours) {
if(neighbour_alive(&neighbours[i], now.tv_sec)) {
if(neighbours[i].send_time < now.tv_sec - 60)
send_keepalive(&neighbours[i]);
i++;
} else {
flush_neighbour(&neighbours[i]);
for(int i = 0; i < numneighs; i++) {
if(neighs[i].fd >= 0 && &neighs[i] != neigh) {
int rc;
rc = buffer_update(&neighs[i], datum);
if(rc < 0) {
close(neighs[i].fd);
neighs[i].fd = -1;
}
}
}
}
struct timespec expire_neighs_time = {0, 0};
void
periodic_flood()
expire_neighs()
{
struct timespec now;
int work = 0;
clock_gettime(CLOCK_MONOTONIC, &now);
if(ts_compare(&expire_neighbours_time, &now) <= 0) {
expire_neighbours();
expire_neighbours_time = now;
expire_neighbours_time.tv_sec += 10;
}
for(int i = 0; i < numneighbours; i++) {
if(!neighbours[i].dump_done && neighbours[i].dump_request_count < 4) {
work = 1;
send_dump_request(&neighbours[i]);
neighbours[i].dump_request_count++;
}
if(neighbours[i].numunacked > 0)
work = 1;
for(int j = 0; j < neighbours[i].numunacked; j++) {
struct unacked *unacked = &neighbours[i].unacked[j];
struct timespec soon = {unacked->time, 0};
if(unacked->count > 0)
soon.tv_sec += 1 << (unacked->count - 1);
if(ts_compare(&soon, &now) <= 0) {
buffer_update(&neighbours[i], unacked->key, unacked->keylen, 1);
unacked->count++;
}
int i = 0;
while(i < numneighs) {
if(neighs[i].fd >= 0) {
i++;
} else if(neighs[i].sin6 != NULL) {
flood_reconnect(&neighs[i]);
i++;
} else {
flush_neighbour(&neighs[i]);
}
flush_updates(&neighbours[i], 1);
}
if(work) {
flood_time = now;
flood_time.tv_sec += 1;
} else {
flood_time.tv_sec = 0;
flood_time.tv_nsec = 0;
}
clock_gettime(CLOCK_MONOTONIC, &expire_neighs_time);
expire_neighs_time.tv_sec += 30;
}
......@@ -18,13 +18,11 @@ datum_val(const struct datum *datum)
return datum->datum + datum->keylen;
}
extern int flood_port;
extern int flood_socket;
extern int server_port;
extern int server_socket;
extern struct datum **data;
extern int numdata, maxdata;
extern struct timespec flood_time;
struct unacked {
int count;
unsigned char *key;
......@@ -38,24 +36,23 @@ struct buffered {
int acked;
};
#define MAXBUFFERED 100
struct buffer {
unsigned char *buf;
int len, cap;
};
struct neighbour {
struct sockaddr_in6 addr;
struct in6_pktinfo *pktinfo;
int permanent;
time_t time;
time_t send_time;
struct unacked *unacked;
int numunacked, maxunacked;
struct buffered *buffered;
int numbuffered;
int dump_request_count;
int dump_done;
int fd;
int handshake_received;
int dump_sent;
struct sockaddr_in6 *sin6;
struct buffer in, out;
};
extern struct neighbour *neighbours;
extern int numneighbours, maxneighbours;
extern struct neighbour *neighs;
extern int numneighs, maxneighs;
extern struct timespec expire_neighs_time;
struct datum *find_datum(const unsigned char *key, int keylen);
struct datum *update_datum(const unsigned char *key, int keylen,
......@@ -67,9 +64,9 @@ time_t datum_remaining(const struct datum *datum);
int extend_datum(struct datum *datum, time_t extend);
int flood_setup(void (*callback)(struct datum *, int));
void flood_cleanup(void);
int flood_listen(void);
struct neighbour *
find_neighbour(struct sockaddr_in6 *sin6, int create, int update, int permanent);
void flood(struct datum *datum, struct neighbour *neigh, int ack, int doit);
void periodic_flood(void);
int flush_updates(struct neighbour *neigh, int all);
int flood_accept(void);
int flood_connect(const struct sockaddr_in6* sin6);
int flood_read(struct neighbour *neigh);
int flood_write(struct neighbour *neigh);
void flood(struct datum *datum, struct neighbour *except);
void expire_neighs(void);
......@@ -119,7 +119,7 @@ update_lease(const unsigned char *mac, int ipv6,
doit = extend_datum(datum, time);
if(doit_return)
*doit_return = doit;
flood(datum, NULL, 0, 1);
flood(datum, NULL);
return datum;
}
......@@ -136,7 +136,7 @@ update_lease(const unsigned char *mac, int ipv6,
if(doit_return)
*doit_return = doit;
update_client_routes(mac, lease_address(datum, ipv6), ipv6);
flood(datum, NULL, 0, 1);
flood(datum, NULL);
return datum;
}
......@@ -199,7 +199,7 @@ update_association(struct interface *interface, const unsigned char *mac,
if(datum->vallen == 8 &&
memcmp(datum_val(datum), myid, 8) == 0) {
extend_datum(datum, time);
flood(datum, NULL, 0, 1);
flood(datum, NULL);
return client;
} else {
seqno = datum->seqno + 1;
......@@ -207,7 +207,7 @@ update_association(struct interface *interface, const unsigned char *mac,
}
datum = update_datum(key, 7, seqno, myid, 8, time, NULL, NULL);
flood(datum, NULL, 0, 1);
flood(datum, NULL);
return client;
}
......@@ -230,5 +230,5 @@ flush_association(const unsigned char *mac, int time)
seqno = datum->seqno + 1;
datum = update_datum(key, 7, seqno, NULL, 0, time, NULL, NULL);
flood(datum, NULL, 0, 1);
flood(datum, NULL);
}
......@@ -211,7 +211,7 @@ main(int argc, char **argv)
p = strtol(optarg, &end, 0);
if(*end != '\0' || p <= 0 || p > 0xFFFF)
goto usage;
flood_port = p;
server_port = p;
}
break;
case 'P': {
......@@ -270,7 +270,7 @@ main(int argc, char **argv)
} else
goto usage;
sin6.sin6_port = htons(port);
find_neighbour(&sin6, 1, 0, 1);
flood_connect(&sin6);
} else {
goto usage;
}
......@@ -339,47 +339,50 @@ main(int argc, char **argv)
}
while(1) {
fd_set readfds;
fd_set readfds, writefds;
int nls = netlink_socket();
int maxfd;
struct timespec deadline;
struct timespec now, deadline;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_SET(nls, &readfds);
maxfd = nls;
if(numinterfaces > 0) {
FD_SET(ra_socket, &readfds);
maxfd = max(maxfd, ra_socket);
FD_SET(dhcpv4_socket, &readfds);
maxfd = max(maxfd, dhcpv4_socket);
}
FD_SET(flood_socket, &readfds);
maxfd = max(nls, flood_socket);
if(numinterfaces > 0)
maxfd = max(maxfd, max(ra_socket, dhcpv4_socket));
if(flood_time.tv_sec > 0) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
ts_minus(&deadline, &flood_time, &now);
if(deadline.tv_sec < 0) {
deadline.tv_sec = 0;
deadline.tv_nsec = 0;
FD_SET(server_socket, &readfds);
maxfd = max(maxfd, server_socket);
for(int i = 0; i < numneighs; i++) {
if(neighs[i].fd >= 0) {
if(neighs[i].out.len > 0)
FD_SET(neighs[i].fd, &writefds);
FD_SET(neighs[i].fd, &readfds);
maxfd = max(maxfd, neighs[i].fd);
}
}
rc = pselect(maxfd + 1, &readfds, NULL, NULL,
flood_time.tv_sec > 0 ? &deadline : NULL, NULL);
clock_gettime(CLOCK_MONOTONIC, &now);
ts_minus(&deadline, &expire_neighs_time, &now);
rc = pselect(maxfd + 1, &readfds, &writefds, NULL, &deadline, NULL);
if(rc < 0 && errno != EINTR) {
perror("pselect");
sleep(1);
}
clock_gettime(CLOCK_MONOTONIC, &now);
if(exiting)
break;
if(dumping) {
static const char zeroes[8] = {0};
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
printf("Interfaces");
for(int i = 0; i < numinterfaces; i++)
printf(" %s", interfaces[i].ifname);
......@@ -462,15 +465,8 @@ main(int argc, char **argv)
printf(".\n");
}
printf("\n");
for(int i = 0; i < numneighbours; i++) {
char buf[INET6_ADDRSTRLEN];
inet_ntop(AF_INET6, &neighbours[i].addr.sin6_addr,
buf, sizeof(buf));
printf("Neighbour %s:%d %ds %ds%s.\n",
buf, ntohs(neighbours[i].addr.sin6_port),
(int)(now.tv_sec - neighbours[i].time),
(int)(now.tv_sec - neighbours[i].send_time),
neighbours[i].permanent ? " (permanent)" : "");
for(int i = 0; i < numneighs; i++) {
printf("Neighbour %d.\n", neighs[i].fd);
}
fflush(stdout);
......@@ -478,33 +474,35 @@ main(int argc, char **argv)
dumping = 0;
}
if(flood_time.tv_sec > 0) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if(ts_compare(&flood_time, &now) <= 0) {
periodic_flood();
if(rc >= 0) {
if(FD_ISSET(nls, &readfds)) {
rc = netlink_listen();
if(rc < 0)
nl_perror(rc, "netlink_listen");
}
}
if(rc <= 0)
continue;
if(FD_ISSET(nls, &readfds)) {
rc = netlink_listen();
if(rc < 0)
nl_perror(rc, "netlink_listen");
}
if(FD_ISSET(server_socket, &readfds))
flood_accept();
if(FD_ISSET(flood_socket, &readfds))
flood_listen();
if(numinterfaces > 0) {
if(FD_ISSET(ra_socket, &readfds))
receive_rs();
for(int i = 0; i < numneighs; i++) {
if(neighs[i].fd >= 0) {
if(FD_ISSET(neighs[i].fd, &readfds))
flood_read(&neighs[i]);
if(FD_ISSET(neighs[i].fd, &writefds))
flood_write(&neighs[i]);
}
}
if(numinterfaces > 0) {
if(FD_ISSET(ra_socket, &readfds))
receive_rs();
if(FD_ISSET(dhcpv4_socket, &readfds))
dhcpv4_receive();
if(FD_ISSET(dhcpv4_socket, &readfds))
dhcpv4_receive();
}
}
if(ts_compare(&now, &expire_neighs_time) >= 0)
expire_neighs();
}
client_cleanup();
......
......@@ -38,8 +38,8 @@ DHCPv4 or IPv6 RA. This option may be specified multiple times, in which
case all prefixes will be announced to clients.
.TP
.BI \-f " port"
Set the local UDP port used by the flooding protocol. If this is not set,
flooding is disabled.
Set the server TCP port used by the flooding protocol. If this is not
set, we don't act as a server.
.TP
.BI \-F " address:port"
Specify the address of a remote peer for the flooding protocol. There is
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment