Commit 89b06540 authored by Xavier Thompson's avatar Xavier Thompson

Rename Deque into Stack

parent f16ce513
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
#include <typon/fundamental/scope.hpp> #include <typon/fundamental/scope.hpp>
#include <typon/core/deque.hpp> #include <typon/core/stack.hpp>
#include <typon/core/scheduler.hpp> #include <typon/core/scheduler.hpp>
...@@ -45,7 +45,7 @@ namespace typon ...@@ -45,7 +45,7 @@ namespace typon
auto state = _state.exchange(ready, std::memory_order_acq_rel); auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter) if (state != no_waiter)
{ {
Scheduler::enable(reinterpret_cast<Deque *>(state)); Scheduler::enable(reinterpret_cast<Stack *>(state));
} }
} }
...@@ -56,11 +56,11 @@ namespace typon ...@@ -56,11 +56,11 @@ namespace typon
void await_suspend(std::coroutine_handle<> coroutine) noexcept void await_suspend(std::coroutine_handle<> coroutine) noexcept
{ {
auto deque = Scheduler::suspend(coroutine); auto stack = Scheduler::suspend(coroutine);
auto state = reinterpret_cast<std::uintptr_t>(deque); auto state = reinterpret_cast<std::uintptr_t>(stack);
if (_state.exchange(state, std::memory_order_acq_rel) == ready) if (_state.exchange(state, std::memory_order_acq_rel) == ready)
{ {
Scheduler::enable(deque); Scheduler::enable(stack);
} }
} }
...@@ -92,7 +92,7 @@ namespace typon ...@@ -92,7 +92,7 @@ namespace typon
auto state = _state.exchange(ready, std::memory_order_acq_rel); auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter) if (state != no_waiter)
{ {
Scheduler::enable(reinterpret_cast<Deque *>(state)); Scheduler::enable(reinterpret_cast<Stack *>(state));
} }
} }
...@@ -103,11 +103,11 @@ namespace typon ...@@ -103,11 +103,11 @@ namespace typon
void await_suspend(std::coroutine_handle<> coroutine) noexcept void await_suspend(std::coroutine_handle<> coroutine) noexcept
{ {
auto deque = Scheduler::suspend(coroutine); auto stack = Scheduler::suspend(coroutine);
auto state = reinterpret_cast<std::uintptr_t>(deque); auto state = reinterpret_cast<std::uintptr_t>(stack);
if (_state.exchange(state, std::memory_order_acq_rel) == ready) if (_state.exchange(state, std::memory_order_acq_rel) == ready)
{ {
Scheduler::enable(deque); Scheduler::enable(stack);
} }
} }
...@@ -133,7 +133,7 @@ namespace typon ...@@ -133,7 +133,7 @@ namespace typon
auto state = _state.exchange(ready, std::memory_order_acq_rel); auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter) if (state != no_waiter)
{ {
Scheduler::enable(reinterpret_cast<Deque *>(state)); Scheduler::enable(reinterpret_cast<Stack *>(state));
} }
} }
...@@ -144,11 +144,11 @@ namespace typon ...@@ -144,11 +144,11 @@ namespace typon
void await_suspend(std::coroutine_handle<> coroutine) noexcept void await_suspend(std::coroutine_handle<> coroutine) noexcept
{ {
auto deque = Scheduler::suspend(coroutine); auto stack = Scheduler::suspend(coroutine);
auto state = reinterpret_cast<std::uintptr_t>(deque); auto state = reinterpret_cast<std::uintptr_t>(stack);
if (_state.exchange(state, std::memory_order_acq_rel) == ready) if (_state.exchange(state, std::memory_order_acq_rel) == ready)
{ {
Scheduler::enable(deque); Scheduler::enable(stack);
} }
} }
...@@ -172,7 +172,7 @@ namespace typon ...@@ -172,7 +172,7 @@ namespace typon
auto state = _state.exchange(ready, std::memory_order_acq_rel); auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter) if (state != no_waiter)
{ {
Scheduler::enable(reinterpret_cast<Deque *>(state)); Scheduler::enable(reinterpret_cast<Stack *>(state));
} }
} }
...@@ -183,11 +183,11 @@ namespace typon ...@@ -183,11 +183,11 @@ namespace typon
void await_suspend(std::coroutine_handle<> coroutine) noexcept void await_suspend(std::coroutine_handle<> coroutine) noexcept
{ {
auto deque = Scheduler::suspend(coroutine); auto stack = Scheduler::suspend(coroutine);
auto state = reinterpret_cast<std::uintptr_t>(deque); auto state = reinterpret_cast<std::uintptr_t>(stack);
if (_state.exchange(state, std::memory_order_acq_rel) == ready) if (_state.exchange(state, std::memory_order_acq_rel) == ready)
{ {
Scheduler::enable(deque); Scheduler::enable(stack);
} }
} }
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
#include <typon/fundamental/random.hpp> #include <typon/fundamental/random.hpp>
#include <typon/core/continuation.hpp> #include <typon/core/continuation.hpp>
#include <typon/core/deque.hpp> #include <typon/core/stack.hpp>
#include <typon/core/worker.hpp> #include <typon/core/worker.hpp>
...@@ -38,7 +38,7 @@ namespace typon ...@@ -38,7 +38,7 @@ namespace typon
static void schedule(std::coroutine_handle<> task) noexcept static void schedule(std::coroutine_handle<> task) noexcept
{ {
uint id = fdt::random::random() % get()._concurrency; uint id = fdt::random::random() % get()._concurrency;
get()._worker[id].add(new Deque(task)); get()._worker[id].add(new Stack(task));
get()._stealables.fetch_add(1); get()._stealables.fetch_add(1);
get()._notifyer.notify_one(); get()._notifyer.notify_one();
} }
...@@ -56,19 +56,19 @@ namespace typon ...@@ -56,19 +56,19 @@ namespace typon
static auto suspend(std::coroutine_handle<> coroutine) noexcept static auto suspend(std::coroutine_handle<> coroutine) noexcept
{ {
Worker & worker = get()._worker[thread_id]; Worker & worker = get()._worker[thread_id];
auto deque = worker.suspend(coroutine); auto stack = worker.suspend(coroutine);
uint id = fdt::random::random() % get()._concurrency; uint id = fdt::random::random() % get()._concurrency;
get()._worker[id].add(deque); get()._worker[id].add(stack);
return deque; return stack;
} }
static void enable(Deque * deque) noexcept static void enable(Stack * stack) noexcept
{ {
auto state = deque->_state.exchange(Deque::Resumable); auto state = stack->_state.exchange(Stack::Resumable);
if (state == Deque::Empty) if (state == Stack::Empty)
{ {
uint id = fdt::random::random() % get()._concurrency; uint id = fdt::random::random() % get()._concurrency;
get()._worker[id].add(deque); get()._worker[id].add(stack);
get()._stealables.fetch_add(1); get()._stealables.fetch_add(1);
} }
get()._notifyer.notify_one(); get()._notifyer.notify_one();
......
#ifndef TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED #ifndef TYPON_FUNDAMENTAL_STACK_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED #define TYPON_FUNDAMENTAL_STACK_HPP_INCLUDED
#include <atomic> #include <atomic>
#include <cstdint> #include <cstdint>
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
namespace typon namespace typon
{ {
struct Deque struct Stack
{ {
using ring_buffer = fdt::lock_free::ring_buffer<Continuation>; using ring_buffer = fdt::lock_free::ring_buffer<Continuation>;
using u64 = ring_buffer::u64; using u64 = ring_buffer::u64;
...@@ -28,14 +28,14 @@ namespace typon ...@@ -28,14 +28,14 @@ namespace typon
std::coroutine_handle<> _coroutine; std::coroutine_handle<> _coroutine;
std::atomic<State> _state; std::atomic<State> _state;
Deque() noexcept {} Stack() noexcept {}
Deque(std::coroutine_handle<> coroutine) noexcept Stack(std::coroutine_handle<> coroutine) noexcept
: _coroutine(coroutine) : _coroutine(coroutine)
, _state(Resumable) , _state(Resumable)
{} {}
~Deque() ~Stack()
{ {
delete _buffer.load(relaxed); delete _buffer.load(relaxed);
} }
...@@ -139,4 +139,4 @@ namespace typon ...@@ -139,4 +139,4 @@ namespace typon
} }
#endif // TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED #endif // TYPON_FUNDAMENTAL_STACK_HPP_INCLUDED
...@@ -28,13 +28,13 @@ namespace typon ...@@ -28,13 +28,13 @@ namespace typon
State _state; State _state;
union union
{ {
Deque * _deque; Stack * _stack;
Continuation _task; Continuation _task;
}; };
Work() noexcept : _state(Empty) {} Work() noexcept : _state(Empty) {}
Work(Deque * deque) noexcept : _state(Resumable), _deque(deque) {} Work(Stack * stack) noexcept : _state(Resumable), _stack(stack) {}
Work(Continuation task) noexcept : _state(Stolen), _task(task) {} Work(Continuation task) noexcept : _state(Stolen), _task(task) {}
...@@ -45,71 +45,71 @@ namespace typon ...@@ -45,71 +45,71 @@ namespace typon
}; };
std::mutex _mutex; std::mutex _mutex;
std::atomic<Deque *> _deque {nullptr}; std::atomic<Stack *> _stack {nullptr};
std::vector<Deque *> _pool; std::vector<Stack *> _pool;
std::atomic_uint_fast64_t * _stealables; std::atomic_uint_fast64_t * _stealables;
fdt::lock_free::garbage_collector * _gc; fdt::lock_free::garbage_collector * _gc;
~Worker() ~Worker()
{ {
for (auto & deque : _pool) for (auto & stack : _pool)
{ {
delete deque; delete stack;
} }
if (auto deque = _deque.load()) if (auto stack = _stack.load())
{ {
delete deque; delete stack;
} }
} }
void add(Deque * deque) noexcept void add(Stack * stack) noexcept
{ {
std::lock_guard lock(_mutex); std::lock_guard lock(_mutex);
_pool.push_back(deque); _pool.push_back(stack);
} }
bool try_add(Deque * deque) noexcept bool try_add(Stack * stack) noexcept
{ {
if (!_mutex.try_lock()) if (!_mutex.try_lock())
{ {
return false; return false;
} }
std::lock_guard lock(_mutex, std::adopt_lock); std::lock_guard lock(_mutex, std::adopt_lock);
_pool.push_back(deque); _pool.push_back(stack);
return true; return true;
} }
auto suspend(std::coroutine_handle<> coroutine) noexcept auto suspend(std::coroutine_handle<> coroutine) noexcept
{ {
auto deque = _deque.load(); auto stack = _stack.load();
_deque.store(nullptr); _stack.store(nullptr);
deque->suspend(coroutine); stack->suspend(coroutine);
return deque; return stack;
} }
void resume(Work & work) noexcept void resume(Work & work) noexcept
{ {
if (work._state == Work::Resumable) if (work._state == Work::Resumable)
{ {
auto deque = _deque.load(); auto stack = _stack.load();
_deque.store(work._deque); _stack.store(work._stack);
if (deque) if (stack)
{ {
_gc->retire(deque); _gc->retire(stack);
} }
_stealables->fetch_add(1); _stealables->fetch_add(1);
work._deque->resume(); work._stack->resume();
} }
else else
{ {
if (!_deque.load()) if (!_stack.load())
{ {
_deque.store(new Deque()); _stack.store(new Stack());
} }
_stealables->fetch_add(1); _stealables->fetch_add(1);
work._task.resume(); work._task.resume();
} }
if (_deque.load()) if (_stack.load())
{ {
_stealables->fetch_sub(1); _stealables->fetch_sub(1);
} }
...@@ -117,14 +117,14 @@ namespace typon ...@@ -117,14 +117,14 @@ namespace typon
void push(Continuation task) noexcept void push(Continuation task) noexcept
{ {
_deque.load()->push(task); _stack.load()->push(task);
} }
bool pop() noexcept bool pop() noexcept
{ {
Deque * deque = _deque.load(); Stack * stack = _stack.load();
bool result = deque->pop(); bool result = stack->pop();
if (auto garbage = deque->reclaim()) if (auto garbage = stack->reclaim())
{ {
_gc->retire(garbage); _gc->retire(garbage);
} }
...@@ -138,8 +138,8 @@ namespace typon ...@@ -138,8 +138,8 @@ namespace typon
return {}; return {};
} }
std::lock_guard lock(_mutex, std::adopt_lock); std::lock_guard lock(_mutex, std::adopt_lock);
auto deque = _deque.load(); auto stack = _stack.load();
auto total = _pool.size() + bool(deque); auto total = _pool.size() + bool(stack);
if (total == 0) if (total == 0)
{ {
return {}; return {};
...@@ -147,25 +147,25 @@ namespace typon ...@@ -147,25 +147,25 @@ namespace typon
auto index = fdt::random::random64() % total; auto index = fdt::random::random64() % total;
if (index == _pool.size()) if (index == _pool.size())
{ {
if (auto task = deque->steal()) if (auto task = stack->steal())
{ {
task.thefts()++; task.thefts()++;
return task; return task;
} }
return {}; return {};
} }
deque = _pool[index]; stack = _pool[index];
if (deque->_state.load() == Deque::Resumable) if (stack->_state.load() == Stack::Resumable)
{ {
if (index < _pool.size() - 1) if (index < _pool.size() - 1)
{ {
_pool[index] = _pool.back(); _pool[index] = _pool.back();
} }
_pool.pop_back(); _pool.pop_back();
return deque; return stack;
} }
auto task = deque->pop_top(); auto task = stack->pop_top();
if (auto garbage = deque->reclaim()) if (auto garbage = stack->reclaim())
{ {
delete garbage; delete garbage;
} }
...@@ -179,10 +179,10 @@ namespace typon ...@@ -179,10 +179,10 @@ namespace typon
_pool[index] = _pool.back(); _pool[index] = _pool.back();
} }
_pool.pop_back(); _pool.pop_back();
Deque::State expected = Deque::Suspended; Stack::State expected = Stack::Suspended;
if (!deque->_state.compare_exchange_strong(expected, Deque::Empty)) if (!stack->_state.compare_exchange_strong(expected, Stack::Empty))
{ {
return deque; return stack;
} }
_stealables->fetch_sub(1); _stealables->fetch_sub(1);
return {}; return {};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment