Commit 94dd4c2b authored by Rusty Russell's avatar Rusty Russell

ccan/io: implement timeouts.

We do this by the simplest method: return from io_loop() and let the caller
sort them out.
Signed-off-by: default avatarRusty Russell <rusty@rustcorp.com.au>
parent e92e2f65
...@@ -118,7 +118,7 @@ ...@@ -118,7 +118,7 @@
* io_set_finish(reader, finish, &from); * io_set_finish(reader, finish, &from);
* io_new_conn(NULL, STDOUT_FILENO, write_out, &from); * io_new_conn(NULL, STDOUT_FILENO, write_out, &from);
* *
* io_loop(); * io_loop(NULL, NULL);
* wait(&status); * wait(&status);
* *
* return WIFEXITED(status) ? WEXITSTATUS(status) : 2; * return WIFEXITED(status) ? WEXITSTATUS(status) : 2;
...@@ -133,7 +133,10 @@ int main(int argc, char *argv[]) ...@@ -133,7 +133,10 @@ int main(int argc, char *argv[])
return 1; return 1;
if (strcmp(argv[1], "depends") == 0) { if (strcmp(argv[1], "depends") == 0) {
printf("ccan/list\n");
printf("ccan/tal\n"); printf("ccan/tal\n");
printf("ccan/time\n");
printf("ccan/timer\n");
printf("ccan/typesafe_cb\n"); printf("ccan/typesafe_cb\n");
return 0; return 0;
} }
......
...@@ -6,6 +6,9 @@ ...@@ -6,6 +6,9 @@
#include <stdbool.h> #include <stdbool.h>
#include <unistd.h> #include <unistd.h>
struct timers;
struct list_head;
/** /**
* struct io_plan - a plan for input or output. * struct io_plan - a plan for input or output.
* *
...@@ -169,7 +172,7 @@ struct io_listener *io_new_listener_(const tal_t *ctx, int fd, ...@@ -169,7 +172,7 @@ struct io_listener *io_new_listener_(const tal_t *ctx, int fd,
* ... * ...
* struct io_listener *l = do_listen("8111"); * struct io_listener *l = do_listen("8111");
* if (l) { * if (l) {
* io_loop(); * io_loop(NULL, NULL);
* io_close_listener(l); * io_close_listener(l);
* } * }
*/ */
...@@ -553,14 +556,17 @@ struct io_plan *io_close_cb(struct io_conn *, void *unused); ...@@ -553,14 +556,17 @@ struct io_plan *io_close_cb(struct io_conn *, void *unused);
/** /**
* io_loop - process fds until all closed on io_break. * io_loop - process fds until all closed on io_break.
* @timers - timers which are waiting to go off (or NULL for none)
* @expired - a list filled with expired timers (can be NULL if @timers is)
* *
* This is the core loop; it exits with the io_break() arg, or NULL if * This is the core loop; it exits with the io_break() arg, or NULL if
* all connections and listeners are closed. * all connections and listeners are closed, or with @expired set to a
* list of expired timers (if @timers isn't NULL).
* *
* Example: * Example:
* io_loop(); * io_loop(NULL, NULL);
*/ */
void *io_loop(void); void *io_loop(struct timers *timers, struct list_head *expired);
/** /**
* io_conn_fd - get the fd from a connection. * io_conn_fd - get the fd from a connection.
......
...@@ -8,6 +8,9 @@ ...@@ -8,6 +8,9 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <limits.h> #include <limits.h>
#include <errno.h> #include <errno.h>
#include <ccan/list/list.h>
#include <ccan/time/time.h>
#include <ccan/timer/timer.h>
static size_t num_fds = 0, max_fds = 0, num_waiting = 0; static size_t num_fds = 0, max_fds = 0, num_waiting = 0;
static struct pollfd *pollfds = NULL; static struct pollfd *pollfds = NULL;
...@@ -223,12 +226,19 @@ static bool handle_always(void) ...@@ -223,12 +226,19 @@ static bool handle_always(void)
} }
/* This is the main loop. */ /* This is the main loop. */
void *io_loop(void) void *io_loop(struct timers *timers, struct list_head *expired)
{ {
void *ret; void *ret;
/* if timers is NULL, expired must be. If not, not. */
assert(!timers == !expired);
/* Make sure this is empty if we exit for some other reason. */
if (expired)
list_head_init(expired);
while (!io_loop_return) { while (!io_loop_return) {
int i, r; int i, r, ms_timeout = -1;
if (close_conns()) { if (close_conns()) {
/* Could have started/finished more. */ /* Could have started/finished more. */
...@@ -247,7 +257,28 @@ void *io_loop(void) ...@@ -247,7 +257,28 @@ void *io_loop(void)
/* You can't tell them all to go to sleep! */ /* You can't tell them all to go to sleep! */
assert(num_waiting); assert(num_waiting);
r = poll(pollfds, num_fds, -1); if (timers) {
struct timeabs now, first;
now = time_now();
/* Call functions for expired timers. */
timers_expire(timers, now, expired);
if (!list_empty(expired))
break;
/* Now figure out how long to wait for the next one. */
if (timer_earliest(timers, &first)) {
uint64_t next;
next = time_to_msec(time_between(first, now));
if (next < INT_MAX)
ms_timeout = next;
else
ms_timeout = INT_MAX;
}
}
r = poll(pollfds, num_fds, ms_timeout);
if (r < 0) if (r < 0)
break; break;
......
...@@ -88,7 +88,7 @@ int main(void) ...@@ -88,7 +88,7 @@ int main(void)
exit(0); exit(0);
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == &state + 1); ok1(io_loop(NULL, NULL) == &state + 1);
ok1(state == 2); ok1(state == 2);
io_close_listener(l); io_close_listener(l);
ok1(wait(&state)); ok1(wait(&state));
......
...@@ -99,7 +99,7 @@ int main(void) ...@@ -99,7 +99,7 @@ int main(void)
exit(0); exit(0);
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == d); ok1(io_loop(NULL, NULL) == d);
ok1(d->state == 2); ok1(d->state == 2);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
free(d); free(d);
......
...@@ -106,7 +106,7 @@ int main(void) ...@@ -106,7 +106,7 @@ int main(void)
free(d); free(d);
exit(0); exit(0);
} }
ok1(io_loop() == d); ok1(io_loop(NULL, NULL) == d);
ok1(d->state == 2); ok1(d->state == 2);
ok1(d->bytes > 0); ok1(d->bytes > 0);
ok1(d->bytes <= sizeof(d->buf)); ok1(d->bytes <= sizeof(d->buf));
...@@ -125,7 +125,7 @@ int main(void) ...@@ -125,7 +125,7 @@ int main(void)
exit(0); exit(0);
} }
d->state = 0; d->state = 0;
ok1(io_loop() == d); ok1(io_loop(NULL, NULL) == d);
ok1(d->state == 2); ok1(d->state == 2);
ok1(d->bytes > 0); ok1(d->bytes > 0);
ok1(d->bytes <= strlen("hi")); ok1(d->bytes <= strlen("hi"));
......
...@@ -109,7 +109,7 @@ int main(void) ...@@ -109,7 +109,7 @@ int main(void)
free(d); free(d);
exit(0); exit(0);
} }
ok1(io_loop() == d); ok1(io_loop(NULL, NULL) == d);
ok1(d->state == 2); ok1(d->state == 2);
ok1(d->bytes > 0); ok1(d->bytes > 0);
ok1(d->bytes <= 1024*1024); ok1(d->bytes <= 1024*1024);
......
...@@ -110,7 +110,7 @@ int main(void) ...@@ -110,7 +110,7 @@ int main(void)
free(d); free(d);
exit(0); exit(0);
} }
ok1(io_loop() == d); ok1(io_loop(NULL, NULL) == d);
ok1(d->state == 2); ok1(d->state == 2);
ok1(wait(&status)); ok1(wait(&status));
......
...@@ -143,7 +143,7 @@ int main(void) ...@@ -143,7 +143,7 @@ int main(void)
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == d); ok1(io_loop(NULL, NULL) == d);
ok1(d->state == 4); ok1(d->state == 4);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
free(d); free(d);
......
...@@ -107,11 +107,11 @@ int main(void) ...@@ -107,11 +107,11 @@ int main(void)
exit(0); exit(0);
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == d); ok1(io_loop(NULL, NULL) == d);
ok1(d->state == 1); ok1(d->state == 1);
io_close_listener(l); io_close_listener(l);
ok1(io_loop() == NULL); ok1(io_loop(NULL, NULL) == NULL);
ok1(d->state == 3); ok1(d->state == 3);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
free(d); free(d);
......
...@@ -52,7 +52,7 @@ int main(void) ...@@ -52,7 +52,7 @@ int main(void)
exit(0); exit(0);
} }
ok1(io_loop() == NULL); ok1(io_loop(NULL, NULL) == NULL);
ok1(memcmp(buf, "hello there world", 16) == 0); ok1(memcmp(buf, "hello there world", 16) == 0);
/* This exits depending on whether all tests passed */ /* This exits depending on whether all tests passed */
......
...@@ -41,7 +41,7 @@ int main(void) ...@@ -41,7 +41,7 @@ int main(void)
conn = io_new_conn(NULL, fds[0], init_waiter, NULL); conn = io_new_conn(NULL, fds[0], init_waiter, NULL);
io_new_conn(conn, fds[1], init_writer, conn); io_new_conn(conn, fds[1], init_writer, conn);
ok1(io_loop() == NULL); ok1(io_loop(NULL, NULL) == NULL);
ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0); ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0);
/* This exits depending on whether all tests passed */ /* This exits depending on whether all tests passed */
......
...@@ -99,7 +99,7 @@ int main(void) ...@@ -99,7 +99,7 @@ int main(void)
addrinfo->ai_protocol); addrinfo->ai_protocol);
ok1(io_new_conn(NULL, fd, setup_connect, addrinfo)); ok1(io_new_conn(NULL, fd, setup_connect, addrinfo));
ok1(io_loop() == NULL); ok1(io_loop(NULL, NULL) == NULL);
ok1(d->state == 2); ok1(d->state == 2);
ok1(d2->state == 2); ok1(d2->state == 2);
......
...@@ -101,7 +101,7 @@ int main(void) ...@@ -101,7 +101,7 @@ int main(void)
ok1(buf[i].writer); ok1(buf[i].writer);
/* They should eventually exit */ /* They should eventually exit */
ok1(io_loop() == NULL); ok1(io_loop(NULL, NULL) == NULL);
for (i = 0; i < NUM; i++) { for (i = 0; i < NUM; i++) {
char b[sizeof(buf[0].buf)]; char b[sizeof(buf[0].buf)];
......
...@@ -119,7 +119,7 @@ int main(void) ...@@ -119,7 +119,7 @@ int main(void)
exit(0); exit(0);
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == NULL); ok1(io_loop(NULL, NULL) == NULL);
ok1(d->state == 4); ok1(d->state == 4);
ok1(d->done == 2); ok1(d->done == 2);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
......
...@@ -23,7 +23,7 @@ int main(void) ...@@ -23,7 +23,7 @@ int main(void)
ok1(pipe(fds) == 0); ok1(pipe(fds) == 0);
io_new_conn(NULL, fds[0], setup_waiter, &status); io_new_conn(NULL, fds[0], setup_waiter, &status);
io_loop(); io_loop(NULL, NULL);
exit(1); exit(1);
} }
......
...@@ -125,7 +125,7 @@ int main(void) ...@@ -125,7 +125,7 @@ int main(void)
exit(0); exit(0);
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == NULL); ok1(io_loop(NULL, NULL) == NULL);
ok1(d->state == 5); ok1(d->state == 5);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
free(d); free(d);
......
...@@ -3,55 +3,49 @@ ...@@ -3,55 +3,49 @@
#include <ccan/io/poll.c> #include <ccan/io/poll.c>
#include <ccan/io/io.c> #include <ccan/io/io.c>
#include <ccan/tap/tap.h> #include <ccan/tap/tap.h>
#include <ccan/time/time.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <stdio.h> #include <stdio.h>
#include <unistd.h> #include <unistd.h>
#if 0
#ifndef PORT #ifndef PORT
#define PORT "65015" #define PORT "65015"
#endif #endif
struct data { struct data {
struct timers timers;
int state; int state;
struct io_conn *conn;
struct timer timer;
int timeout_usec; int timeout_usec;
bool timed_out;
char buf[4]; char buf[4];
}; };
static void finish_ok(struct io_conn *conn, struct data *d)
static struct io_plan no_timeout(struct io_conn *conn, struct data *d)
{ {
ok1(d->state == 1);
d->state++; d->state++;
return io_close(); io_break(d);
} }
static struct io_plan timeout(struct io_conn *conn, struct data *d) static struct io_plan *no_timeout(struct io_conn *conn, struct data *d)
{ {
ok1(d->state == 1); ok1(d->state == 1);
d->state++; d->state++;
d->timed_out = true; return io_close(conn);
return io_close();
}
static void finish_ok(struct io_conn *conn, struct data *d)
{
ok1(d->state == 2);
d->state++;
io_break(d);
} }
static void init_conn(int fd, struct data *d) static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{ {
struct io_conn *conn;
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d)); d->conn = conn;
io_set_finish(conn, finish_ok, d); io_set_finish(conn, finish_ok, d);
io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
timer_add(&d->timers, &d->timer,
timeabs_add(time_now(), time_from_usec(d->timeout_usec)));
return io_read(conn, d->buf, sizeof(d->buf), no_timeout, d);
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
...@@ -91,16 +85,17 @@ int main(void) ...@@ -91,16 +85,17 @@ int main(void)
struct data *d = malloc(sizeof(*d)); struct data *d = malloc(sizeof(*d));
struct addrinfo *addrinfo; struct addrinfo *addrinfo;
struct io_listener *l; struct io_listener *l;
struct list_head expired;
int fd, status; int fd, status;
/* This is how many tests you plan to run */ /* This is how many tests you plan to run */
plan_tests(20); plan_tests(21);
d->state = 0; d->state = 0;
d->timed_out = false;
d->timeout_usec = 100000; d->timeout_usec = 100000;
timers_init(&d->timers, time_now());
fd = make_listen_fd(PORT, &addrinfo); fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0); ok1(fd >= 0);
l = io_new_listener(fd, init_conn, d); l = io_new_listener(NULL, fd, init_conn, d);
ok1(l); ok1(l);
fflush(stdout); fflush(stdout);
...@@ -122,19 +117,31 @@ int main(void) ...@@ -122,19 +117,31 @@ int main(void)
} }
close(fd); close(fd);
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
timers_cleanup(&d->timers);
free(d); free(d);
exit(i); exit(i);
} }
ok1(io_loop() == d); ok1(io_loop(&d->timers, &expired) == NULL);
ok1(d->state == 3);
ok1(d->timed_out == true); /* One element, d->timer. */
ok1(list_pop(&expired, struct timer, list) == &d->timer);
ok1(list_empty(&expired));
ok1(d->state == 1);
io_close(d->conn);
/* Finished will be called, d will be returned */
ok1(io_loop(&d->timers, &expired) == d);
ok1(list_empty(&expired));
ok1(d->state == 2);
/* It should have died. */
ok1(wait(&status)); ok1(wait(&status));
ok1(WIFEXITED(status)); ok1(WIFEXITED(status));
ok1(WEXITSTATUS(status) < sizeof(d->buf)); ok1(WEXITSTATUS(status) < sizeof(d->buf));
/* This one shouldn't time out. */ /* This one shouldn't time out. */
d->state = 0; d->state = 0;
d->timed_out = false;
d->timeout_usec = 500000; d->timeout_usec = 500000;
fflush(stdout); fflush(stdout);
...@@ -156,26 +163,22 @@ int main(void) ...@@ -156,26 +163,22 @@ int main(void)
} }
close(fd); close(fd);
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
timers_cleanup(&d->timers);
free(d); free(d);
exit(i); exit(i);
} }
ok1(io_loop() == d); ok1(io_loop(&d->timers, &expired) == d);
ok1(d->state == 3); ok1(d->state == 3);
ok1(d->timed_out == false); ok1(list_empty(&expired));
ok1(wait(&status)); ok1(wait(&status));
ok1(WIFEXITED(status)); ok1(WIFEXITED(status));
ok1(WEXITSTATUS(status) >= sizeof(d->buf)); ok1(WEXITSTATUS(status) >= sizeof(d->buf));
io_close_listener(l); io_close_listener(l);
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
timers_cleanup(&d->timers);
free(d); free(d);
/* This exits depending on whether all tests passed */ /* This exits depending on whether all tests passed */
return exit_status(); return exit_status();
} }
#else
int main(void)
{
return 0;
}
#endif
...@@ -119,7 +119,7 @@ int main(void) ...@@ -119,7 +119,7 @@ int main(void)
exit(0); exit(0);
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == NULL); ok1(io_loop(NULL, NULL) == NULL);
ok1(d->state == 4); ok1(d->state == 4);
ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0); ok1(memcmp(d->buf, "hellothere", sizeof(d->buf)) == 0);
free(d); free(d);
......
...@@ -166,7 +166,7 @@ int main(void) ...@@ -166,7 +166,7 @@ int main(void)
exit(0); exit(0);
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == pkt); ok1(io_loop(NULL, NULL) == pkt);
ok1(pkt->state == 4); ok1(pkt->state == 4);
ok1(pkt->len == 8); ok1(pkt->len == 8);
ok1(memcmp(pkt->contents, "hithere!", 8) == 0); ok1(memcmp(pkt->contents, "hithere!", 8) == 0);
......
...@@ -109,7 +109,7 @@ int main(void) ...@@ -109,7 +109,7 @@ int main(void)
exit(0); exit(0);
} }
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo);
ok1(io_loop() == &state + 1); ok1(io_loop(NULL, NULL) == &state + 1);
ok1(state == 4); ok1(state == 4);
io_close_listener(l); io_close_listener(l);
ok1(wait(&state)); ok1(wait(&state));
......
...@@ -117,7 +117,7 @@ int main(void) ...@@ -117,7 +117,7 @@ int main(void)
free(d); free(d);
exit(0); exit(0);
} }
ok1(io_loop() == d); ok1(io_loop(NULL, NULL) == d);
ok1(d->state == 2); ok1(d->state == 2);
ok1(wait(&status)); ok1(wait(&status));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment