Commit f69389e6 authored by Rusty Russell's avatar Rusty Russell

lbalance: new module for load balancing

parent 18631151
../../licenses/GPL-3
\ No newline at end of file
#include "config.h"
#include <string.h>
/**
* lbalance - helpers for loadbalancing parallel tasks
*
* This code helps when you have a large number of one-shot tasks; it tries
* to determine the maximum amount of useful parallelism.
*
* License: GPL
* Author: Rusty Russell <rusty@rustcorp.com.au>
*/
int main(int argc, char *argv[])
{
/* Expect exactly one argument */
if (argc != 2)
return 1;
if (strcmp(argv[1], "depends") == 0) {
printf("ccan/tlist\n");
return 0;
}
return 1;
}
#include <ccan/lbalance/lbalance.h>
#include <ccan/tlist/tlist.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#include <stdlib.h>
/* Define tlist_lbalance_task */
TLIST_TYPE(lbalance_task, struct lbalance_task);
struct stats {
/* How many stats of for this value do we have? */
unsigned int num_stats;
/* What was our total work rate? */
float work_rate;
};
struct lbalance {
struct tlist_lbalance_task tasks;
unsigned int num_tasks;
/* We figured out how many we want to run. */
unsigned int target;
/* We need to recalc once a report comes in via lbalance_task_free. */
bool target_uptodate;
/* Integral of how many tasks were running so far */
struct timeval prev_tasks_time;
float tasks_sum;
/* For differential rusage. */
struct rusage prev_usage;
/* How many stats we have collected (we invalidate old ones). */
unsigned int total_stats;
/* Array of stats, indexed by number of tasks we were running. */
unsigned int max_stats;
struct stats *stats;
};
struct lbalance_task {
struct lbalance *lb;
struct list_node list;
/* The time this task started */
struct timeval start;
float tasks_sum_start;
};
struct lbalance *lbalance_new(void)
{
struct lbalance *lb = malloc(sizeof *lb);
if (!lb)
return NULL;
tlist_init(&lb->tasks);
lb->num_tasks = 0;
gettimeofday(&lb->prev_tasks_time, NULL);
lb->tasks_sum = 0.0;
getrusage(RUSAGE_CHILDREN, &lb->prev_usage);
lb->max_stats = 1;
lb->stats = malloc(sizeof(lb->stats[0]) * lb->max_stats);
if (!lb->stats) {
free(lb);
return NULL;
}
lb->stats[0].num_stats = 0;
lb->stats[0].work_rate = 0.0;
lb->total_stats = 0;
/* Start with # CPUS as a guess. */
lb->target = -1L;
#ifdef _SC_NPROCESSORS_ONLN
lb->target = sysconf(_SC_NPROCESSORS_ONLN);
#elif defined(_SC_NPROCESSORS_CONF)
if (lb->target == (unsigned int)-1L)
lb->target = sysconf(_SC_NPROCESSORS_CONF);
#endif
/* Otherwise, two is a good number. */
if (lb->target == (unsigned int)-1L || lb->target < 2)
lb->target = 2;
lb->target_uptodate = true;
return lb;
}
/* Return time differences in usec */
static float timeval_sub(struct timeval recent, struct timeval old)
{
float diff;
if (old.tv_usec > recent.tv_usec) {
diff = 1000000 + recent.tv_usec - old.tv_usec;
recent.tv_sec--;
} else
diff = recent.tv_usec - old.tv_usec;
diff += (float)(recent.tv_sec - old.tv_sec) * 1000000;
return diff;
}
/* There were num_tasks running between prev_tasks_time and now. */
static void update_tasks_sum(struct lbalance *lb,
const struct timeval *now)
{
lb->tasks_sum += timeval_sub(*now, lb->prev_tasks_time)
* lb->num_tasks;
lb->prev_tasks_time = *now;
}
struct lbalance_task *lbalance_task_new(struct lbalance *lb)
{
struct lbalance_task *task = malloc(sizeof *task);
if (!task)
return NULL;
if (lb->num_tasks + 1 == lb->max_stats) {
struct stats *s = realloc(lb->stats,
sizeof(*s) * (lb->max_stats + 1));
if (!s) {
free(task);
return NULL;
}
lb->stats = s;
lb->stats[lb->max_stats].num_stats = 0;
lb->stats[lb->max_stats].work_rate = 0.0;
lb->max_stats++;
}
task->lb = lb;
gettimeofday(&task->start, NULL);
/* Record that we ran num_tasks up until now. */
update_tasks_sum(lb, &task->start);
task->tasks_sum_start = lb->tasks_sum;
tlist_add_tail(&lb->tasks, task, list);
lb->num_tasks++;
return task;
}
/* We slowly erase old stats, once we have enough. */
static void degrade_stats(struct lbalance *lb)
{
unsigned int i;
if (lb->total_stats < lb->max_stats * 16)
return;
#if 0
fprintf(stderr, ".");
#endif
for (i = 0; i < lb->max_stats; i++) {
struct stats *s = &lb->stats[i];
unsigned int stats_lost = (s->num_stats + 1) / 2;
s->work_rate *= (float)(s->num_stats - stats_lost)
/ s->num_stats;
s->num_stats -= stats_lost;
lb->total_stats -= stats_lost;
if (s->num_stats == 0)
s->work_rate = 0.0;
}
}
static void add_to_stats(struct lbalance *lb,
unsigned int num_tasks,
float work_rate)
{
#if 0
fprintf(stderr, "With %.2f running, work rate was %.5f\n",
num_tasks, work_rate);
#endif
assert(num_tasks >= 1);
assert(num_tasks < lb->max_stats);
lb->stats[num_tasks].num_stats++;
lb->stats[num_tasks].work_rate += work_rate;
lb->total_stats++;
lb->target_uptodate = false;
}
void lbalance_task_free(struct lbalance_task *task,
const struct rusage *usage)
{
float work_done, duration;
unsigned int num_tasks;
struct timeval now;
struct rusage ru;
gettimeofday(&now, NULL);
duration = timeval_sub(now, task->start);
getrusage(RUSAGE_CHILDREN, &ru);
if (usage) {
work_done = usage->ru_utime.tv_usec + usage->ru_stime.tv_usec
+ (usage->ru_utime.tv_sec + usage->ru_stime.tv_sec)
* 1000000;
} else {
/* Take difference in rusage as rusage of that task. */
work_done = timeval_sub(ru.ru_utime,
task->lb->prev_usage.ru_utime)
+ timeval_sub(ru.ru_stime,
task->lb->prev_usage.ru_utime);
}
/* Update previous usage. */
task->lb->prev_usage = ru;
/* Record that we ran num_tasks up until now. */
update_tasks_sum(task->lb, &now);
/* So, on average, how many tasks were running during this time? */
num_tasks = (task->lb->tasks_sum - task->tasks_sum_start)
/ duration + 0.5;
/* Record the work rate for that many tasks. */
add_to_stats(task->lb, num_tasks, work_done / duration);
/* We throw away old stats. */
degrade_stats(task->lb);
/* We need to recalculate the target. */
task->lb->target_uptodate = false;
/* Remove this task. */
tlist_del_from(&task->lb->tasks, task, list);
task->lb->num_tasks--;
free(task);
}
/* We look for the point where the work rate starts to drop. Say you have
* 4 cpus, we'd expect the work rate for 5 processes to drop 20%.
*
* If we're within 1/4 of that ideal ratio, we assume it's still
* optimal. Any drop of more than 1/2 is interpreted as the point we
* are overloaded. */
static unsigned int best_target(const struct lbalance *lb)
{
unsigned int i, best = 0, found_drop = 0;
float best_f_max = -1.0, cliff = -1.0;
#if 0
for (i = 1; i < lb->max_stats; i++) {
printf("%u: %f (%u)\n", i,
lb->stats[i].work_rate / lb->stats[i].num_stats,
lb->stats[i].num_stats);
}
#endif
for (i = 1; i < lb->max_stats; i++) {
float f;
if (!lb->stats[i].num_stats)
f = 0;
else
f = lb->stats[i].work_rate / lb->stats[i].num_stats;
if (f > best_f_max) {
#if 0
printf("Best is %i\n", i);
#endif
best_f_max = f - (f / (i + 1)) / 4;
cliff = f - (f / (i + 1)) / 2;
best = i;
found_drop = 0;
} else if (!found_drop && f < cliff) {
#if 0
printf("Found drop at %i\n", i);
#endif
found_drop = i;
}
}
if (found_drop) {
return found_drop - 1;
}
return i - 1;
}
static unsigned int calculate_target(struct lbalance *lb)
{
unsigned int target;
target = best_target(lb);
/* Jitter if the adjacent ones are unknown. */
if (target >= lb->max_stats || lb->stats[target].num_stats == 0)
return target;
if (target + 1 == lb->max_stats || lb->stats[target+1].num_stats == 0)
return target + 1;
if (target > 1 && lb->stats[target-1].num_stats == 0)
return target - 1;
return target;
}
unsigned lbalance_target(struct lbalance *lb)
{
if (!lb->target_uptodate) {
lb->target = calculate_target(lb);
lb->target_uptodate = true;
}
return lb->target;
}
void lbalance_free(struct lbalance *lb)
{
struct lbalance_task *task;
while ((task = tlist_top(&lb->tasks, struct lbalance_task, list))) {
assert(task->lb == lb);
tlist_del_from(&lb->tasks, task, list);
lb->num_tasks--;
free(task);
}
assert(lb->num_tasks == 0);
free(lb->stats);
free(lb);
}
#ifndef CCAN_LBALANCE_H
#define CCAN_LBALANCE_H
#include "config.h"
struct lbalance;
struct lbalance_task;
struct timeval;
struct rusage;
/**
* lbalance_new - initialize a load balancing structure.
*/
struct lbalance *lbalance_new(void);
/**
* lbalance_task_new - mark the starting of a new task.
* @lbalance: the load balancer from lbalance_new.
*/
struct lbalance_task *lbalance_task_new(struct lbalance *lbalance);
/**
* lbalance_task_free - mark the completion of a task.
* @task: the lbalance_task from lbalance_task_new, which will be freed.
* @usage: the resource usage for that task (or NULL).
*
* If @usage is NULL, you must have already wait()ed for the child so
* that lbalance_task_free() can derive it from the difference in
* getrusage() for the child processes.
*
* Otherwise, lbalance_task_free() is a noop, which is useful for failure
* paths.
*/
void lbalance_task_free(struct lbalance_task *task,
const struct rusage *usage);
/**
* lbalance_target - how many tasks in parallel are recommended?
* @lbalance: the load balancer from lbalance_new.
*
* Normally you keep creating tasks until this limit is reached. It's
* updated by stats from lbalance_task_free.
*/
unsigned lbalance_target(struct lbalance *lbalance);
/**
* lbalance_free - free a load balancing structure.
* @lbalance: the load balancer from lbalance_new.
*
* Also frees any tasks still attached.
*/
void lbalance_free(struct lbalance *lbalance);
#endif /* CCAN_LBALANCE_H */
#include "config.h"
#include <sys/time.h>
#include <sys/resource.h>
#include <unistd.h>
#include <errno.h>
static int fake_gettimeofday(struct timeval *tv, struct timezone *tz);
static int fake_getrusage(int who, struct rusage *usage);
#define gettimeofday fake_gettimeofday
#define getrusage fake_getrusage
#include <ccan/lbalance/lbalance.c>
#include <ccan/tap/tap.h>
static unsigned faketime_ms = 0;
static struct rusage total_usage;
static int fake_gettimeofday(struct timeval *tv, struct timezone *tz)
{
assert(tz == NULL);
tv->tv_usec = (faketime_ms % 1000) * 1000;
tv->tv_sec = faketime_ms / 1000;
return 0;
}
static int fake_getrusage(int who, struct rusage *usage)
{
assert(who == RUSAGE_CHILDREN);
*usage = total_usage;
return 0;
}
static void test_optimum(struct lbalance *lb, unsigned int optimum)
{
unsigned int j, i, num_tasks = 0, usec, num_counted = 0;
float average;
struct lbalance_task *tasks[1000];
for (j = 0; j < 1000; j++) {
diag("lbalance_target is %u\n", lbalance_target(lb));
/* We measure average once we try optimum once. */
if (lbalance_target(lb) == optimum && num_counted == 0) {
average = lbalance_target(lb);
num_counted = 1;
} else if (num_counted) {
average += lbalance_target(lb);
num_counted++;
}
/* Create tasks until we reach target. */
for (i = 0; i < lbalance_target(lb); i++) {
tasks[i] = lbalance_task_new(lb);
}
num_tasks = i;
faketime_ms += 100;
/* If we're under optimum, set utilization to 100% */
if (num_tasks <= optimum) {
usec = 100000;
} else {
usec = 100000 * optimum / num_tasks;
}
for (i = 0; i < num_tasks; i++) {
total_usage.ru_utime.tv_usec += usec / 2;
if (total_usage.ru_utime.tv_usec > 1000000) {
total_usage.ru_utime.tv_usec -= 1000000;
total_usage.ru_utime.tv_sec++;
}
total_usage.ru_stime.tv_usec += usec / 2;
if (total_usage.ru_stime.tv_usec > 1000000) {
total_usage.ru_stime.tv_usec -= 1000000;
total_usage.ru_stime.tv_sec++;
}
lbalance_task_free(tasks[i], NULL);
}
}
/* We should have stayed close to optimum. */
ok1(num_counted && (int)(average / num_counted + 0.5) == optimum);
}
int main(void)
{
struct lbalance *lb;
plan_tests(4);
lb = lbalance_new();
test_optimum(lb, 1);
test_optimum(lb, 2);
test_optimum(lb, 4);
test_optimum(lb, 64);
lbalance_free(lb);
return exit_status();
}
#! /usr/bin/make
MODULES=../../jmap.o ../../time.o
CFLAGS=-I../../.. -g #-O2
LDFLAGS=-lJudy
lbalance: lbalance.c $(MODULES)
$(MODULES):
make -C ../../.. $(patsubst ../../%.o, ccan/%.o, $@) EXCLUDE=
clean:
rm -f lbalance $(MODULES)
#include <ccan/lbalance/lbalance.h>
#include <ccan/lbalance/lbalance.c>
#include <ccan/time/time.h>
#include <ccan/jmap/jmap_type.h>
#include <stdio.h>
#include <err.h>
/* Defines struct jmap_task. */
JMAP_DEFINE_UINTIDX_TYPE(struct lbalance_task, task);
/* Figure out how many loops we need to run for about 1 second. */
static unsigned long burn_count;
static void calibrate_burn_cpu(void)
{
struct timeval start = time_now();
while (time_less(time_now(), time_add(start, time_from_msec(1000))))
burn_count++;
printf("Burn count = %lu\n", burn_count);
}
static void burn_cpu(void)
{
unsigned int i, after = 0;
struct timeval start = time_now();
/* We do a loop similar to the calibrate_burn_cpu loop. */
for (i = 0; i < burn_count; i++) {
after += time_less(time_now(),
time_add(start, time_from_msec(1000)));
}
/* We use the result so the compiler can't discard it. */
exit(after);
}
static pid_t spawn(char *args[])
{
pid_t pid = fork();
if (pid == -1)
err(1, "forking");
if (pid == 0) {
if (!args[0])
burn_cpu();
execvp(args[0], args);
err(1, "exec failed");
}
return pid;
}
int main(int argc, char *argv[])
{
unsigned int i, num, fixed_target = 0, num_done = 0, num_running = 0;
struct lbalance *lb;
struct jmap_task *tasks = jmap_task_new();
if (argc < 2) {
fprintf(stderr,
"Usage: lbalance --fixed=<num> <num> [<command>...]\n"
"OR: lbalance <num> [<command>...]\n");
exit(1);
}
if (strncmp(argv[1], "--fixed=", strlen("--fixed=")) == 0) {
fixed_target = atoi(argv[1] + strlen("--fixed="));
if (!fixed_target)
errx(1, "Need positive number after --fixed");
argv++;
argc--;
lb = NULL;
} else {
lb = lbalance_new();
}
num = atoi(argv[1]);
argv++;
argc--;
if (!argv[1])
calibrate_burn_cpu();
while (num_done < num) {
unsigned int j, target = fixed_target;
struct lbalance_task *task;
struct rusage ru;
pid_t pid;
if (lb) {
target = lbalance_target(lb);
printf("(%u)", target);
}
while (num_running < target && num_done + num_running < num) {
pid = spawn(argv+1);
if (lb)
task = lbalance_task_new(lb);
else
task = (void *)1;
jmap_task_add(tasks, pid, task);
num_running++;
printf("+"); fflush(stdout);
}
/* Now wait for something to die! */
pid = wait3(NULL, 0, &ru);
task = jmap_task_get(tasks, pid);
if (lb)
lbalance_task_free(task, &ru);
num_done++;
num_running--;
printf("-"); fflush(stdout);
}
printf("\n");
if (lb)
lbalance_free(lb);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment