Commit 580457bd authored by Rusty Russell's avatar Rusty Russell

ccan/io: add io_conn_exclusive and io_conn_out_exclusive.

There are cases where we want to suppress all activity except for a
single fd; we already have ugly io_flush_sync, but this is more
useful and more general.
Signed-off-by: default avatarRusty Russell <rusty@rustcorp.com.au>
parent 0fa318d0
...@@ -8,6 +8,8 @@ ...@@ -8,6 +8,8 @@
struct fd { struct fd {
int fd; int fd;
bool listener; bool listener;
/* We could put these in io_plan, but they pack nicely here */
bool exclusive[2];
size_t backend_info; size_t backend_info;
}; };
...@@ -76,6 +78,7 @@ void cleanup_conn_without_close(struct io_conn *c); ...@@ -76,6 +78,7 @@ void cleanup_conn_without_close(struct io_conn *c);
bool backend_new_always(struct io_plan *plan); bool backend_new_always(struct io_plan *plan);
void backend_new_plan(struct io_conn *conn); void backend_new_plan(struct io_conn *conn);
void backend_plan_done(struct io_conn *conn); void backend_plan_done(struct io_conn *conn);
bool backend_set_exclusive(struct io_plan *plan, bool exclusive);
void backend_wake(const void *wait); void backend_wake(const void *wait);
......
...@@ -119,6 +119,16 @@ struct io_conn *io_new_conn_(const tal_t *ctx, int fd, ...@@ -119,6 +119,16 @@ struct io_conn *io_new_conn_(const tal_t *ctx, int fd,
return conn; return conn;
} }
bool io_conn_exclusive(struct io_conn *conn, bool exclusive)
{
return backend_set_exclusive(&conn->plan[IO_IN], exclusive);
}
bool io_conn_out_exclusive(struct io_conn *conn, bool exclusive)
{
return backend_set_exclusive(&conn->plan[IO_OUT], exclusive);
}
void io_set_finish_(struct io_conn *conn, void io_set_finish_(struct io_conn *conn,
void (*finish)(struct io_conn *, void *), void (*finish)(struct io_conn *, void *),
void *arg) void *arg)
......
...@@ -722,6 +722,35 @@ bool io_plan_out_started(const struct io_conn *conn); ...@@ -722,6 +722,35 @@ bool io_plan_out_started(const struct io_conn *conn);
*/ */
bool io_flush_sync(struct io_conn *conn); bool io_flush_sync(struct io_conn *conn);
/**
* io_conn_exclusive - set/unset an io_conn to exclusively serviced
* @conn: the connection
* @exclusive: whether to be exclusive or not
*
* If any io_conn is set exclusive, then no non-exclusive io_conn (or
* io_listener) will be serviced by io_loop(). If it's a io_duplex io_conn(),
* then io_conn_exclusive() makes the read-side exclusive; io_conn_out_exclusive()
* makes the write-side exclusive.
*
* This allows you to temporarily service only one (or several) fds.
* For example, you might want to flush out one io_conn and not
* receive any new connections or read any otherninput.
*
* Returns true of there any exclusive io_conn remain, otherwise false.
* (This is useful for checking your own logic: dangling exclusive io_conn
* are dangerous!).
*/
bool io_conn_exclusive(struct io_conn *conn, bool exclusive);
/**
* io_conn_out_exclusive - set/unset exclusive on the write-side of a duplex
* @conn: the connection, post io_duplex
* @exclusive: whether to be exclusive or not
*
* See io_conn_exclusive() above.
*/
bool io_conn_out_exclusive(struct io_conn *conn, bool exclusive);
/** /**
* io_fd_block - helper to set an fd blocking/nonblocking. * io_fd_block - helper to set an fd blocking/nonblocking.
* @fd: the file descriptor * @fd: the file descriptor
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
#include <ccan/time/time.h> #include <ccan/time/time.h>
#include <ccan/timer/timer.h> #include <ccan/timer/timer.h>
static size_t num_fds = 0, max_fds = 0, num_waiting = 0, num_always = 0, max_always = 0; static size_t num_fds = 0, max_fds = 0, num_waiting = 0, num_always = 0, max_always = 0, num_exclusive = 0;
static struct pollfd *pollfds = NULL; static struct pollfd *pollfds = NULL;
static struct fd **fds = NULL; static struct fd **fds = NULL;
static struct io_plan **always = NULL; static struct io_plan **always = NULL;
...@@ -64,6 +64,7 @@ static bool add_fd(struct fd *fd, short events) ...@@ -64,6 +64,7 @@ static bool add_fd(struct fd *fd, short events)
pollfds[num_fds].revents = 0; /* In case we're iterating now */ pollfds[num_fds].revents = 0; /* In case we're iterating now */
fds[num_fds] = fd; fds[num_fds] = fd;
fd->backend_info = num_fds; fd->backend_info = num_fds;
fd->exclusive[0] = fd->exclusive[1] = false;
num_fds++; num_fds++;
if (events) if (events)
num_waiting++; num_waiting++;
...@@ -93,6 +94,11 @@ static void del_fd(struct fd *fd) ...@@ -93,6 +94,11 @@ static void del_fd(struct fd *fd)
} }
num_fds--; num_fds--;
fd->backend_info = -1; fd->backend_info = -1;
if (fd->exclusive[IO_IN])
num_exclusive--;
if (fd->exclusive[IO_OUT])
num_exclusive--;
} }
static void destroy_listener(struct io_listener *l) static void destroy_listener(struct io_listener *l)
...@@ -157,12 +163,9 @@ bool backend_new_always(struct io_plan *plan) ...@@ -157,12 +163,9 @@ bool backend_new_always(struct io_plan *plan)
return true; return true;
} }
void backend_new_plan(struct io_conn *conn) static void setup_pfd(struct io_conn *conn, struct pollfd *pfd)
{ {
struct pollfd *pfd = &pollfds[conn->fd.backend_info]; assert(pfd == &pollfds[conn->fd.backend_info]);
if (pfd->events)
num_waiting--;
pfd->events = 0; pfd->events = 0;
if (conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED if (conn->plan[IO_IN].status == IO_POLLING_NOTSTARTED
...@@ -173,13 +176,25 @@ void backend_new_plan(struct io_conn *conn) ...@@ -173,13 +176,25 @@ void backend_new_plan(struct io_conn *conn)
pfd->events |= POLLOUT; pfd->events |= POLLOUT;
if (pfd->events) { if (pfd->events) {
num_waiting++;
pfd->fd = conn->fd.fd; pfd->fd = conn->fd.fd;
} else { } else {
pfd->fd = -conn->fd.fd - 1; pfd->fd = -conn->fd.fd - 1;
} }
} }
void backend_new_plan(struct io_conn *conn)
{
struct pollfd *pfd = &pollfds[conn->fd.backend_info];
if (pfd->events)
num_waiting--;
setup_pfd(conn, pfd);
if (pfd->events)
num_waiting++;
}
void backend_wake(const void *wait) void backend_wake(const void *wait)
{ {
unsigned int i; unsigned int i;
...@@ -250,18 +265,88 @@ static void accept_conn(struct io_listener *l) ...@@ -250,18 +265,88 @@ static void accept_conn(struct io_listener *l)
io_new_conn(l->ctx, fd, l->init, l->arg); io_new_conn(l->ctx, fd, l->init, l->arg);
} }
/* Return pointer to exclusive flag for this plan. */
static bool *exclusive(struct io_plan *plan)
{
struct io_conn *conn;
conn = container_of(plan, struct io_conn, plan[plan->dir]);
return &conn->fd.exclusive[plan->dir];
}
/* For simplicity, we do one always at a time */
static bool handle_always(void) static bool handle_always(void)
{ {
bool ret = false; /* Backwards is simple easier to remove entries */
for (int i = num_always - 1; i >= 0; i--) {
struct io_plan *plan = always[i];
while (num_always > 0) { if (num_exclusive && !*exclusive(plan))
continue;
/* Remove first: it might re-add */ /* Remove first: it might re-add */
struct io_plan *plan = always[num_always-1]; if (i != num_always-1)
always[i] = always[num_always-1];
num_always--; num_always--;
io_do_always(plan); io_do_always(plan);
ret = true; return true;
}
return false;
}
bool backend_set_exclusive(struct io_plan *plan, bool excl)
{
bool *excl_ptr = exclusive(plan);
if (excl != *excl_ptr) {
*excl_ptr = excl;
if (!excl)
num_exclusive--;
else
num_exclusive++;
}
return num_exclusive != 0;
}
/* FIXME: We could do this once at set_exclusive time, and catch everywhere
* else that we manipulate events. */
static void exclude_pollfds(void)
{
if (num_exclusive == 0)
return;
for (size_t i = 0; i < num_fds; i++) {
struct pollfd *pfd = &pollfds[fds[i]->backend_info];
if (!fds[i]->exclusive[IO_IN])
pfd->events &= ~POLLIN;
if (!fds[i]->exclusive[IO_OUT])
pfd->events &= ~POLLOUT;
/* If we're not listening, we don't want error events
* either. */
if (!pfd->events)
pfd->fd = -fds[i]->fd - 1;
}
}
static void restore_pollfds(void)
{
if (num_exclusive == 0)
return;
for (size_t i = 0; i < num_fds; i++) {
struct pollfd *pfd = &pollfds[fds[i]->backend_info];
if (fds[i]->listener) {
pfd->events = POLLIN;
pfd->fd = fds[i]->fd;
} else {
struct io_conn *conn = (void *)fds[i];
setup_pfd(conn, pfd);
}
} }
return ret;
} }
/* This is the main loop. */ /* This is the main loop. */
...@@ -312,7 +397,11 @@ void *io_loop(struct timers *timers, struct timer **expired) ...@@ -312,7 +397,11 @@ void *io_loop(struct timers *timers, struct timer **expired)
} }
} }
/* We do this temporarily, assuming exclusive is unusual */
exclude_pollfds();
r = pollfn(pollfds, num_fds, ms_timeout); r = pollfn(pollfds, num_fds, ms_timeout);
restore_pollfds();
if (r < 0) { if (r < 0) {
/* Signals shouldn't break us, unless they set /* Signals shouldn't break us, unless they set
* io_loop_return. */ * io_loop_return. */
...@@ -325,6 +414,9 @@ void *io_loop(struct timers *timers, struct timer **expired) ...@@ -325,6 +414,9 @@ void *io_loop(struct timers *timers, struct timer **expired)
struct io_conn *c = (void *)fds[i]; struct io_conn *c = (void *)fds[i];
int events = pollfds[i].revents; int events = pollfds[i].revents;
/* Clear so we don't get confused if exclusive next time */
pollfds[i].revents = 0;
if (r == 0) if (r == 0)
break; break;
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <stdio.h> #include <stdio.h>
#define PORT "65020" #define PORT "65041"
/* Should be looking to read from one fd. */ /* Should be looking to read from one fd. */
static int mypoll(struct pollfd *fds, nfds_t nfds, int timeout) static int mypoll(struct pollfd *fds, nfds_t nfds, int timeout)
......
#include <ccan/io/io.h>
/* Include the C files directly. */
#include <ccan/io/poll.c>
#include <ccan/io/io.c>
#include <ccan/tap/tap.h>
#include <sys/wait.h>
#include <stdio.h>
#define PORT "65046"
struct data {
struct io_listener *l;
int num_clients;
char *pattern;
char buf[30];
size_t buflen;
};
static struct io_plan *read_more(struct io_conn *conn, struct data *d);
static struct io_plan *read_done(struct io_conn *conn, struct data *d)
{
tal_resize(&d->pattern, tal_count(d->pattern) + strlen(d->buf));
strcat(d->pattern, d->buf);
return read_more(conn, d);
}
static struct io_plan *read_more(struct io_conn *conn, struct data *d)
{
memset(d->buf, 0, sizeof(d->buf));
return io_read_partial(conn, d->buf, sizeof(d->buf), &d->buflen,
read_done, d);
}
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
d->num_clients++;
if (d->num_clients == 2) {
/* Free listener so when conns close we exit io_loop */
io_close_listener(d->l);
/* Set priority to second connection. */
ok1(io_conn_exclusive(conn, true) == true);
}
return read_more(conn, d);
}
static int make_listen_fd(const char *port, struct addrinfo **info)
{
int fd, on = 1;
struct addrinfo *addrinfo, hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0;
if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
return -1;
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0)
return -1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
close(fd);
return -1;
}
if (listen(fd, 1) != 0) {
close(fd);
return -1;
}
*info = addrinfo;
return fd;
}
int main(void)
{
struct addrinfo *addrinfo = NULL;
int fd, status;
struct data d;
d.num_clients = 0;
/* This is how many tests you plan to run */
plan_tests(8);
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
d.l = io_new_listener(NULL, fd, init_conn, &d);
ok1(d.l);
fflush(stdout);
if (!fork()) {
int fd1, fd2;
io_close_listener(d.l);
fd1 = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd1 < 0)
exit(1);
if (connect(fd1, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
exit(2);
if (write(fd1, "1hellothere", strlen("1hellothere")) != strlen("1hellothere"))
exit(3);
fd2 = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd2 < 0)
exit(1);
if (connect(fd2, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
exit(2);
signal(SIGPIPE, SIG_IGN);
sleep(1);
if (write(fd1, "1helloagain", strlen("1helloagain")) != strlen("1helloagain"))
exit(4);
sleep(1);
if (write(fd2, "2hellonew", strlen("2hellonew")) != strlen("2hellonew"))
exit(5);
close(fd1);
close(fd2);
freeaddrinfo(addrinfo);
exit(0);
}
freeaddrinfo(addrinfo);
d.pattern = tal_arrz(NULL, char, 1);
ok1(io_loop(NULL, NULL) == NULL);
if (!ok1(strcmp(d.pattern, "1hellothere2hellonew1helloagain") == 0))
printf("d.patterns = %s\n", d.pattern);
tal_free(d.pattern);
ok1(wait(&status));
ok1(WIFEXITED(status));
ok1(WEXITSTATUS(status) == 0);
/* This exits depending on whether all tests passed */
return exit_status();
}
#include <ccan/io/io.h>
/* Include the C files directly. */
#include <ccan/io/poll.c>
#include <ccan/io/io.c>
#include <ccan/tap/tap.h>
#include <sys/wait.h>
#include <stdio.h>
#define PORT "65047"
struct data {
struct io_listener *l;
char *pattern;
char buf[30];
size_t buflen;
};
static struct io_plan *read_more(struct io_conn *conn, struct data *d);
static struct io_plan *write_more(struct io_conn *conn, struct data *d);
static struct io_plan *read_done(struct io_conn *conn, struct data *d)
{
tal_resize(&d->pattern, tal_count(d->pattern) + 1 + strlen(d->buf));
strcat(d->pattern, "<");
strcat(d->pattern, d->buf);
return read_more(conn, d);
}
static struct io_plan *read_more(struct io_conn *conn, struct data *d)
{
memset(d->buf, 0, sizeof(d->buf));
return io_read_partial(conn, d->buf, sizeof(d->buf), &d->buflen,
read_done, d);
}
static struct io_plan *write_done(struct io_conn *conn, struct data *d)
{
tal_resize(&d->pattern, tal_count(d->pattern) + 1);
strcat(d->pattern, ">");
return write_more(conn, d);
}
static struct io_plan *write_more(struct io_conn *conn, struct data *d)
{
return io_write_partial(conn, d->buf, 1, &d->buflen,
write_done, d);
}
static struct io_plan *read_priority_init(struct io_conn *conn, struct data *d)
{
/* This should suppress the write */
ok1(io_conn_exclusive(conn, true));
return read_more(conn, d);
}
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
/* Free listener so when conns close we exit io_loop */
io_close_listener(d->l);
return io_duplex(conn, read_priority_init(conn, d), write_more(conn, d));
}
static int make_listen_fd(const char *port, struct addrinfo **info)
{
int fd, on = 1;
struct addrinfo *addrinfo, hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0;
if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
return -1;
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0)
return -1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
close(fd);
return -1;
}
if (listen(fd, 1) != 0) {
close(fd);
return -1;
}
*info = addrinfo;
return fd;
}
int main(void)
{
struct addrinfo *addrinfo = NULL;
int fd, status;
struct data d;
/* This is how many tests you plan to run */
plan_tests(8);
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
d.l = io_new_listener(NULL, fd, init_conn, &d);
ok1(d.l);
fflush(stdout);
if (!fork()) {
io_close_listener(d.l);
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0)
exit(1);
if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
exit(2);
signal(SIGPIPE, SIG_IGN);
if (write(fd, "1hellothere", strlen("1hellothere")) != strlen("1hellothere"))
exit(3);
sleep(1);
if (write(fd, "1helloagain", strlen("1helloagain")) != strlen("1helloagain"))
exit(4);
close(fd);
freeaddrinfo(addrinfo);
exit(0);
}
freeaddrinfo(addrinfo);
d.pattern = tal_arrz(NULL, char, 1);
ok1(io_loop(NULL, NULL) == NULL);
/* No trace of writes */
ok1(strcmp(d.pattern, "<1hellothere<1helloagain") == 0);
tal_free(d.pattern);
ok1(wait(&status));
ok1(WIFEXITED(status));
ok1(WEXITSTATUS(status) == 0);
/* This exits depending on whether all tests passed */
return exit_status();
}
#include <ccan/io/io.h>
/* Include the C files directly. */
#include <ccan/io/poll.c>
#include <ccan/io/io.c>
#include <ccan/tap/tap.h>
#include <sys/wait.h>
#include <stdio.h>
#define PORT "65048"
struct data {
struct io_listener *l;
char *pattern;
char buf[30];
size_t buflen;
};
static struct io_plan *read_more(struct io_conn *conn, struct data *d);
static struct io_plan *write_more(struct io_conn *conn, struct data *d);
static struct io_plan *read_done(struct io_conn *conn, struct data *d)
{
tal_resize(&d->pattern, tal_count(d->pattern) + 1 + strlen(d->buf));
strcat(d->pattern, "<");
strcat(d->pattern, d->buf);
return read_more(conn, d);
}
static struct io_plan *read_more(struct io_conn *conn, struct data *d)
{
memset(d->buf, 0, sizeof(d->buf));
return io_read_partial(conn, d->buf, sizeof(d->buf), &d->buflen,
read_done, d);
}
static struct io_plan *write_done(struct io_conn *conn, struct data *d)
{
tal_resize(&d->pattern, tal_count(d->pattern) + 1);
strcat(d->pattern, ">");
return write_more(conn, d);
}
static struct io_plan *write_more(struct io_conn *conn, struct data *d)
{
return io_write_partial(conn, d->buf, 1, &d->buflen,
write_done, d);
}
static struct io_plan *write_priority_init(struct io_conn *conn, struct data *d)
{
/* This should suppress the read */
ok1(io_conn_out_exclusive(conn, true));
return write_more(conn, d);
}
static struct io_plan *init_conn(struct io_conn *conn, struct data *d)
{
/* Free listener so when conns close we exit io_loop */
io_close_listener(d->l);
return io_duplex(conn, read_more(conn, d), write_priority_init(conn, d));
}
static int make_listen_fd(const char *port, struct addrinfo **info)
{
int fd, on = 1;
struct addrinfo *addrinfo, hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0;
if (getaddrinfo(NULL, port, &hints, &addrinfo) != 0)
return -1;
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0)
return -1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0) {
close(fd);
return -1;
}
if (listen(fd, 1) != 0) {
close(fd);
return -1;
}
*info = addrinfo;
return fd;
}
int main(void)
{
struct addrinfo *addrinfo = NULL;
int fd, status;
struct data d;
/* This is how many tests you plan to run */
plan_tests(8);
fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0);
d.l = io_new_listener(NULL, fd, init_conn, &d);
ok1(d.l);
fflush(stdout);
if (!fork()) {
io_close_listener(d.l);
fd = socket(addrinfo->ai_family, addrinfo->ai_socktype,
addrinfo->ai_protocol);
if (fd < 0)
exit(1);
if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) != 0)
exit(2);
signal(SIGPIPE, SIG_IGN);
if (write(fd, "1hellothere", strlen("1hellothere")) != strlen("1hellothere"))
exit(3);
sleep(1);
if (write(fd, "1helloagain", strlen("1helloagain")) != strlen("1helloagain"))
exit(4);
close(fd);
freeaddrinfo(addrinfo);
exit(0);
}
freeaddrinfo(addrinfo);
d.pattern = tal_arrz(NULL, char, 1);
ok1(io_loop(NULL, NULL) == NULL);
/* No trace of reads */
ok1(strspn(d.pattern, ">") == strlen(d.pattern));
tal_free(d.pattern);
ok1(wait(&status));
ok1(WIFEXITED(status));
ok1(WEXITSTATUS(status) == 0);
/* This exits depending on whether all tests passed */
return exit_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment