Commit 2ba25c7b authored by Xavier Thompson's avatar Xavier Thompson

Improve scheduler:

- Let thieves remove empty suspended deques from pools
- When enabling empty suspended deques, reinsert them in a pool
- Use direct locking when suspending and enabling
parent b3fb4d4f
...@@ -20,17 +20,19 @@ namespace typon ...@@ -20,17 +20,19 @@ namespace typon
using enum std::memory_order; using enum std::memory_order;
enum State : unsigned char { Empty, Suspended, Resumable };
std::atomic<u64> _top {1}; std::atomic<u64> _top {1};
std::atomic<u64> _bottom {1}; std::atomic<u64> _bottom {1};
std::atomic<ring_buffer *> _buffer { new ring_buffer(3) }; std::atomic<ring_buffer *> _buffer { new ring_buffer(3) };
std::coroutine_handle<> _coroutine; std::coroutine_handle<> _coroutine;
std::atomic<bool> _resumable; std::atomic<State> _state;
Deque() noexcept {} Deque() noexcept {}
Deque(std::coroutine_handle<> coroutine) noexcept Deque(std::coroutine_handle<> coroutine) noexcept
: _coroutine(coroutine) : _coroutine(coroutine)
, _resumable(true) , _state(Resumable)
{} {}
~Deque() ~Deque()
...@@ -107,7 +109,7 @@ namespace typon ...@@ -107,7 +109,7 @@ namespace typon
void suspend(std::coroutine_handle<> coroutine) noexcept void suspend(std::coroutine_handle<> coroutine) noexcept
{ {
_resumable.store(false); _state.store(Suspended);
_coroutine = coroutine; _coroutine = coroutine;
} }
......
...@@ -44,17 +44,17 @@ namespace typon ...@@ -44,17 +44,17 @@ namespace typon
static void push(Continuation task) noexcept static void push(Continuation task) noexcept
{ {
get()._worker[thread_id]._active.load()->push(task); get()._worker[thread_id]._deque.load()->push(task);
} }
static bool pop() noexcept static bool pop() noexcept
{ {
Scheduler & scheduler = get(); Scheduler & scheduler = get();
Deque * active = scheduler._worker[thread_id]._active.load(); Deque * deque = scheduler._worker[thread_id]._deque.load();
bool result = active->pop(); bool result = deque->pop();
if (auto array = active->reclaim()) if (auto garbage = deque->reclaim())
{ {
scheduler._gc.retire(array); scheduler._gc.retire(garbage);
} }
return result; return result;
} }
...@@ -63,24 +63,23 @@ namespace typon ...@@ -63,24 +63,23 @@ namespace typon
{ {
Scheduler & scheduler = get(); Scheduler & scheduler = get();
Worker & worker = scheduler._worker[thread_id]; Worker & worker = scheduler._worker[thread_id];
Deque * deque = worker._active.load(); Deque * deque = worker._deque.load();
worker._active.store(nullptr); worker._deque.store(nullptr);
deque->suspend(coroutine); deque->suspend(coroutine);
for (uint i = 0; i < scheduler._concurrency * 2; i++) uint id = fdt::random::random() % scheduler._concurrency;
{ scheduler._worker[id].add(deque);
uint id = fdt::random::random() % scheduler._concurrency;
if (scheduler._worker[id].try_add(deque))
{
return deque;
}
}
worker.add(deque);
return deque; return deque;
} }
static void enable(Deque * deque) noexcept static void enable(Deque * deque) noexcept
{ {
deque->_resumable.store(true); auto state = deque->_state.exchange(Deque::Resumable);
if (state == Deque::Empty)
{
Scheduler & scheduler = get();
uint id = fdt::random::random() % scheduler._concurrency;
scheduler._worker[id].add(deque);
}
get()._notifyer.notify_one(); get()._notifyer.notify_one();
} }
......
...@@ -43,7 +43,7 @@ namespace typon ...@@ -43,7 +43,7 @@ namespace typon
}; };
std::mutex _mutex; std::mutex _mutex;
std::atomic<Deque *> _active {nullptr}; std::atomic<Deque *> _deque {nullptr};
std::vector<Deque *> _pool; std::vector<Deque *> _pool;
fdt::lock_free::garbage_collector * _gc; fdt::lock_free::garbage_collector * _gc;
...@@ -53,29 +53,29 @@ namespace typon ...@@ -53,29 +53,29 @@ namespace typon
{ {
delete deque; delete deque;
} }
if (auto active = _active.load()) if (auto deque = _deque.load())
{ {
delete active; delete deque;
} }
} }
void resume(Work & work) noexcept void resume(Work & work) noexcept
{ {
auto active = _active.load(); auto deque = _deque.load();
if (work._state == Work::Resumable) if (work._state == Work::Resumable)
{ {
_active.store(work._deque); _deque.store(work._deque);
if (active) if (deque)
{ {
_gc->retire(active); _gc->retire(deque);
} }
work._deque->resume(); work._deque->resume();
} }
else else
{ {
if (!active) if (!deque)
{ {
_active.store(new Deque()); _deque.store(new Deque());
} }
work._task.resume(); work._task.resume();
} }
...@@ -105,43 +105,53 @@ namespace typon ...@@ -105,43 +105,53 @@ namespace typon
return {}; return {};
} }
std::lock_guard lock(_mutex, std::adopt_lock); std::lock_guard lock(_mutex, std::adopt_lock);
auto active = _active.load(); auto deque = _deque.load();
auto total = _pool.size() + bool(active); auto total = _pool.size() + bool(deque);
if (total == 0) if (total == 0)
{ {
return {}; return {};
} }
auto index = fdt::random::random64() % (_pool.size() + bool(active)); auto index = fdt::random::random64() % total;
if (index == _pool.size()) if (index == _pool.size())
{ {
if (auto task = active->steal()) if (auto task = deque->steal())
{ {
task.thefts()++; task.thefts()++;
return task; return task;
} }
return {}; return {};
} }
auto deque = _pool[index]; deque = _pool[index];
if (!deque->_resumable.load()) if (deque->_state.load() == Deque::Resumable)
{ {
auto task = deque->pop_top(); if (index < _pool.size() - 1)
if (auto garbage = deque->reclaim())
{
delete garbage;
}
if (task)
{ {
task.thefts()++; _pool[index] = _pool.back();
return task;
} }
return {}; _pool.pop_back();
return deque;
}
auto task = deque->pop_top();
if (auto garbage = deque->reclaim())
{
delete garbage;
}
if (task)
{
task.thefts()++;
return task;
} }
if (index < _pool.size() - 1) if (index < _pool.size() - 1)
{ {
_pool[index] = _pool.back(); _pool[index] = _pool.back();
} }
_pool.pop_back(); _pool.pop_back();
return deque; Deque::State expected = Deque::Suspended;
if (!deque->_state.compare_exchange_strong(expected, Deque::Empty))
{
return deque;
}
return {};
} }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment