Commit 922496c3 authored by Juho Snellman's avatar Juho Snellman

Add a facility for limiting number of callbacks to execute in one advance()

- Mark wheel as in the middle of partial tick, and use a special code
  path at start of advance() to pick up where we left off.
- ticks_to_next_event() will return 0 until the partial tick is
  finished.
- The order of operations is such that it's safe to continue scheduling
  further events during this stage.
- No measurable performance hit on the fast path, pretty minimal cost when the
  limit gets hit.
parent 6eda585f
......@@ -15,6 +15,9 @@ instead of some other are:
be triggered. The implementation avoids unnecessary work when an
event is rescheduled, and provides a way for the user specify a
range of acceptable execution times instead of just an exact one.
- Facility for limiting the number of events to execute on a
single invocation, to allow fine grained interleaving of timer
processing and application logic.
- An interface that at least the author finds convenient.
The exact implementation strategy is a hierarchical timer
......@@ -82,7 +85,11 @@ eventually be executed once the time advances far enough with the
***** =TimerWheel::advance(Tick delta, int level = 0)=
Advance the TimerWheel by the specified number of ticks (=delta=), and execute
any events scheduled for execution at or before that time.
any events scheduled for execution at or before that time. The
number of events executed can be restricted using the =max_execute=
parameter. If that limit is reached, the function will return false,
and the excess events will be processed on a subsequent call.
- It is safe to cancel or schedule events from within event callbacks.
- During the execution of the callback the observable event tick will
be the tick it was scheduled to run on; not the tick the clock will
......@@ -90,6 +97,9 @@ any events scheduled for execution at or before that time.
- Events will happen in order; all events scheduled for tick X will
be executed before any event scheduled for tick X+1.
Delta should be non-0. The only exception is if the previous
call to =advance()= returned false.
The =level= parameter is used to trigger timer advances on different
levels of the hierarchy. It will generally not be useful to pass in
any value other than the default 0.
......@@ -117,6 +127,9 @@ executed. If the max parameter is passed, that will be the maximum
tick value that gets returned. The max parameter's value will also
be returned if no events have been scheduled.
Will return 0 if the wheel still has unprocessed events from the
previous call to advance().
The =level= parameter is used to trigger timer advances on different
levels of the hierarchy. It will generally not be useful to pass in
any value other than the default 0.
......
......@@ -3,7 +3,9 @@
// Copyright 2016 Juho Snellman, released under a MIT license (see
// LICENSE).
#include <algorithm>
#include <functional>
#include <vector>
#include "../timer-wheel.h"
......@@ -304,6 +306,79 @@ bool test_single_timer_random() {
return true;
}
bool test_maxexec() {
typedef std::function<void()> Callback;
TimerWheel timers;
int count0 = 0;
int count1 = 0;
TimerEvent<Callback> timer0([&count0] () { ++count0; });
TimerEvent<Callback> timer1a([&count1] () { ++count1; });
TimerEvent<Callback> timer1b([&count1] () { ++count1; });
// Schedule 3 timers to happen at the same time (on 2 different
// wheels).
timers.schedule(&timer1a, 256);
timers.schedule(&timer1b, 256);
timers.advance(1);
timers.schedule(&timer0, 255);
timers.advance(254);
EXPECT_INTEQ(count0, 0);
EXPECT_INTEQ(count1, 0);
EXPECT_INTEQ(timers.ticks_to_next_event(), 1);
EXPECT_INTEQ(timers.now(), 255);
// Then run them one by one.
EXPECT(!timers.advance(1, 1));
EXPECT_INTEQ(count0, 0);
EXPECT_INTEQ(count1, 1);
EXPECT_INTEQ(timers.ticks_to_next_event(), 0);
// Note that time has already advanced.
EXPECT_INTEQ(timers.now(), 256);
EXPECT(!timers.advance(0, 1));
EXPECT_INTEQ(count0, 0);
EXPECT_INTEQ(count1, 2);
EXPECT(!timers.advance(0, 1));
EXPECT_INTEQ(count0, 1);
EXPECT_INTEQ(count1, 2);
// We have not finished the tick yet, since the last call exactly
// drained the queue. But the next call will finish the tick while
// doing no actual work.
EXPECT_INTEQ(timers.ticks_to_next_event(100), 0);
EXPECT(timers.advance(0, 1));
EXPECT_INTEQ(timers.ticks_to_next_event(100), 100);
// Test scheduling while wheel is in the middle of partial tick handling.
timers.schedule(&timer1a, 256);
timers.advance(1);
timers.schedule(&timer0, 255);
timers.advance(254);
EXPECT(!timers.advance(1, 1));
// Now in the middle of the tick.
std::vector<bool> done(false, 512);
std::vector<TimerEvent<Callback>*> events;
// Schedule 512 timers, each setting the matching bit in "done".
for (int i = 0; i < done.size(); ++i) {
auto event = new TimerEvent<Callback>([&done, i] () { done[i] = true; });
events.push_back(event);
timers.schedule(event, i + 1);
}
// Close the tick.
EXPECT(timers.advance(0, 100));
// Now check that all 512 timers were scheduled in the right location.
for (int i = 0; i < done.size(); ++i) {
EXPECT_INTEQ(std::count(done.begin(), done.end(), true), i);
EXPECT(!done[i]);
timers.advance(1);
EXPECT(done[i]);
}
return true;
}
class Test {
public:
Test()
......@@ -352,6 +427,7 @@ int main(void) {
TEST(test_ticks_to_next_event);
TEST(test_schedule_in_range);
TEST(test_single_timer_random);
TEST(test_maxexec);
TEST(test_reschedule_from_timer);
TEST(test_timeout_method);
// Test canceling timer from within timer
......
......@@ -18,6 +18,9 @@
// be triggered. The implementation avoids unnecessary work when an
// event is rescheduled, and provides a way for the user specify a
// range of acceptable execution times instead of just an exact one.
// - Facility for limiting the number of events to execute on a
// single invocation, to allow fine grained interleaving of timer
// processing and application logic.
// - An interface that at least the author finds convenient.
//
// The exact implementation strategy is a hierarchical timer
......@@ -218,17 +221,27 @@ public:
for (int i = 0; i < NUM_LEVELS; ++i) {
now_[i] = now >> (WIDTH_BITS * i);
}
ticks_pending_ = 0;
}
// Advance the TimerWheel by the specified number of ticks, and execute
// any events scheduled for execution at or before that time.
// any events scheduled for execution at or before that time. The
// number of events executed can be restricted using the max_execute
// parameter. If that limit is reached, the function will return false,
// and the excess events will be processed on a subsequent call.
//
// - It is safe to cancel or schedule events from within event callbacks.
// - During the execution of the callback the observable event tick will
// be the tick it was scheduled to run on; not the tick the clock will
// be advanced to.
// - Events will happen in order; all events scheduled for tick X will
// be executed before any event scheduled for tick X+1.
void advance(Tick delta, int level = 0);
//
// Delta should be non-0. The only exception is if the previous
// call to advance() returned false.
bool advance(Tick delta,
size_t max_execute=std::numeric_limits<size_t>::max(),
int level = 0);
// Schedule the event to be executed delta ticks from the current time.
// The delta must be non-0.
......@@ -252,6 +265,9 @@ public:
// executed. If the max parameter is passed, that will be the maximum
// tick value that gets returned. The max parameter's value will also
// be returned if no events have been scheduled.
//
// Will return 0 if the wheel still has unprocessed events from the
// previous call to advance().
Tick ticks_to_next_event(const Tick& max = std::numeric_limits<Tick>::max(),
int level = 0);
......@@ -259,6 +275,10 @@ private:
TimerWheel(const TimerWheel& other) = delete;
TimerWheel& operator=(const TimerWheel& other) = delete;
// This handles the actual work of executing event callbacks and
// recursing to the outer wheels.
bool process_current_slot(Tick now, size_t max_execute, int level);
static const int WIDTH_BITS = 8;
static const int NUM_LEVELS = std::ceil(64 / WIDTH_BITS);
static const int MAX_LEVEL = NUM_LEVELS - 1;
......@@ -271,6 +291,9 @@ private:
// such that each slot is separated by exactly one tick even on
// the outermost wheels.
Tick now_[NUM_LEVELS];
// We've done a partial tick advance. This is how many ticks remain
// unprocessed.
Tick ticks_pending_;
TimerWheelSlot slots_[NUM_LEVELS][NUM_SLOTS];
};
......@@ -322,30 +345,84 @@ void TimerEventInterface::cancel() {
relink(NULL);
}
void TimerWheel::advance(Tick delta, int level) {
assert(delta > 0);
bool TimerWheel::advance(Tick delta, size_t max_events, int level) {
if (ticks_pending_) {
if (level == 0) {
// Continue collecting a backlog of ticks to process if
// we're called with non-zero deltas.
ticks_pending_ += delta;
}
// We only partially processed the last tick. Process the
// current slot, rather incrementing like advance() normally
// does.
Tick now = now_[level];
if (!process_current_slot(now, max_events, level)) {
// Outer layers are still not done, propagate that information
// back up.
return false;
}
if (level == 0) {
// The core wheel has been fully processed. We can now close
// down the partial tick and pretend that we've just been
// called with a delta containing both the new and original
// amounts.
delta = (ticks_pending_ - 1);
ticks_pending_ = 0;
} else {
return true;
}
} else {
// Zero deltas are only ok when in the middle of a partially
// processed tick.
assert(delta > 0);
}
while (delta--) {
Tick now = ++now_[level];
size_t slot_index = now & MASK;
auto slot = &slots_[level][slot_index];
if (slot_index == 0 && level < MAX_LEVEL) {
advance(1, level + 1);
if (!process_current_slot(now, max_events, level)) {
ticks_pending_ = (delta + 1);
return false;
}
}
return true;
}
bool TimerWheel::process_current_slot(Tick now, size_t max_events, int level) {
size_t slot_index = now & MASK;
auto slot = &slots_[level][slot_index];
if (slot_index == 0 && level < MAX_LEVEL) {
if (!advance(1, max_events, level + 1)) {
return false;
}
while (slot->events()) {
auto event = slot->pop_event();
if (level > 0) {
assert((now_[0] & MASK) == 0);
if (now_[0] >= event->scheduled_at()) {
event->execute();
} else {
schedule(event,
event->scheduled_at() - now_[0]);
}
while (slot->events()) {
auto event = slot->pop_event();
if (level > 0) {
assert((now_[0] & MASK) == 0);
if (now_[0] >= event->scheduled_at()) {
event->execute();
if (!--max_events) {
return false;
}
} else {
event->execute();
// There's a case to be made that promotion should
// also count as work done. And that would simplify
// this code since the max_events manipulation could
// move to the top of the loop. But it's an order of
// magnitude more expensive to execute a typical
// callback, and promotions will naturally clump while
// events triggering won't.
schedule(event,
event->scheduled_at() - now_[0]);
}
} else {
event->execute();
if (!--max_events) {
return false;
}
}
}
return true;
}
void TimerWheel::schedule(TimerEventInterface* event, Tick delta) {
......@@ -391,6 +468,9 @@ void TimerWheel::schedule_in_range(TimerEventInterface* event,
}
Tick TimerWheel::ticks_to_next_event(const Tick& max, int level) {
if (ticks_pending_) {
return 0;
}
// The actual current time (not the bitshifted time)
Tick now = now_[0];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment