Commit 0ea612aa authored by Xavier Thompson's avatar Xavier Thompson

Add lazy tasks and fork/join scheduler

parents
#ifndef TYPON_CONTINUATION_HPP_INCLUDED
#define TYPON_CONTINUATION_HPP_INCLUDED
#include <atomic>
#include <coroutine>
#include <cstdint>
#include <exception>
#include <limits>
namespace typon::rt
{
struct coroutine_node
{
coroutine_node * _next;
std::coroutine_handle<> _coroutine;
std::exception_ptr * _exception;
};
struct coroutine_list
{
coroutine_node * _first = nullptr;
template <typename Promise>
void insert(std::coroutine_handle<Promise> coroutine) noexcept
{
std::exception_ptr * exception = &(coroutine.promise()._exception);
coroutine.promise()._node = { _first, coroutine, exception };
_first = &(coroutine.promise()._node);
}
void check_exceptions()
{
for (auto node = _first; node != nullptr; node = node->_next)
{
std::exception_ptr & exception = *(node->_exception);
if (exception)
{
std::rethrow_exception(exception);
}
}
}
void clear() noexcept
{
auto next = _first;
while(next)
{
auto node = next;
next = next->_next;
node->_coroutine.destroy();
}
_first = nullptr;
}
};
struct continuation_data
{
using u64 = std::uint_fast64_t;
static constexpr u64 UMAX = std::numeric_limits<u64>::max();
std::coroutine_handle<> _coroutine;
std::coroutine_handle<> _continuation;
coroutine_list _children;
std::atomic<u64> _n = UMAX;
u64 _thefts = 0;
continuation_data(std::coroutine_handle<> coroutine)
: _coroutine(coroutine)
{}
};
struct continuation_handle
{
continuation_data * _data;
continuation_handle() noexcept {}
template <typename Promise>
continuation_handle(std::coroutine_handle<Promise> coroutine) noexcept
: _data(&(coroutine.promise()._data))
{}
auto & thefts() noexcept
{
return _data->_thefts;
}
auto & children() noexcept
{
return _data->_children;
}
auto & n() noexcept
{
return _data->_n;
}
void resume()
{
_data->_coroutine.resume();
}
operator std::coroutine_handle<>() noexcept
{
return _data->_coroutine;
}
std::coroutine_handle<> continuation() noexcept
{
if (_data->_coroutine.done())
{
return _data->_continuation;
}
return _data->_coroutine;
}
};
}
#endif // TYPON_CONTINUATION_HPP_INCLUDED
#ifndef TYPON_DEQUE_HPP_INCLUDED
#define TYPON_DEQUE_HPP_INCLUDED
#include <atomic>
#include <cstdint>
#include <memory>
#include <utility>
namespace typon::rt
{
template <typename T>
struct RingBuffer
{
using u8 = std::uint_least8_t;
using u64 = std::uint_fast64_t;
using enum std::memory_order;
const u8 _bits;
const u64 _mask;
RingBuffer * const _next;
std::atomic<T> * const _array;
RingBuffer(u8 bits, RingBuffer * next = nullptr) noexcept
: _bits(bits)
, _mask(this->capacity() - 1)
, _next(next)
, _array(new std::atomic<T>[this->capacity()])
{}
~RingBuffer()
{
delete [] _array;
if (_next)
{
delete _next;
}
}
u64 capacity() noexcept
{
return u64(1) << _bits;
}
void put(u64 index, T object) noexcept
{
_array[index & _mask].store(std::move(object), relaxed);
}
T get(u64 index) noexcept
{
return _array[index & _mask].load(relaxed);
}
RingBuffer * fill(RingBuffer * sink, u64 start, u64 end) noexcept
{
for (u64 i = start; i < end; i++)
{
sink->put(i, get(i));
}
return sink;
}
RingBuffer * grow(u64 start, u64 end) noexcept
{
return fill(new RingBuffer(_bits + 1, this), start, end);
}
RingBuffer * shrink(u64 start, u64 end) noexcept
{
return fill(std::exchange(_next, nullptr), start, end);
}
};
template <typename T>
struct Optional
{
enum State : unsigned char { empty = 0, abort = 1, engaged = 2};
State _state;
union
{
T _value;
};
Optional(State state = empty) noexcept : _state(state) {}
Optional(T value) noexcept : _state(engaged), _value(value) {}
~Optional()
{
if (_state == engaged)
{
std::destroy_at(std::addressof(_value));
}
}
operator bool() noexcept
{
return _state == engaged;
}
State state() noexcept
{
return _state;
}
T * operator->() noexcept
{
return std::addressof(_value);
}
T & operator*() noexcept
{
return _value;
}
};
template <typename T>
struct StealingDeque
{
using u8 = typename RingBuffer<T>::u8;
using u64 = typename RingBuffer<T>::u64;
using Array = RingBuffer<T>;
using enum std::memory_order;
std::atomic<u64> _top {1};
std::atomic<u64> _bottom {1};
std::atomic<Array *> _array;
StealingDeque(u8 bits = 2) noexcept
: _array(new Array(bits))
{}
~StealingDeque()
{
delete _array;
}
void push(T x) noexcept
{
u64 t = _top.load(relaxed);
u64 b = _bottom.load(acquire);
Array * array = _array.load(relaxed);
if (t - b > array->capacity() - 1)
{
array = array->grow(b, t);
_array.store(array);
}
array->put(t, x);
std::atomic_thread_fence(release);
_top.store(t + 1, relaxed);
}
Optional<T> pop() noexcept
{
u64 t = _top.load(relaxed) - 1;
Array * array = _array.load(relaxed);
_top.store(t, relaxed);
std::atomic_thread_fence(seq_cst);
u64 b = _bottom.load(relaxed);
Optional<T> x {};
if (b <= t)
{
x = array->get(t);
if (b == t)
{
if (!_bottom.compare_exchange_strong(b, b + 1, seq_cst, relaxed))
{
x = {};
}
_top.store(t + 1, relaxed);
}
}
else
{
_top.store(t + 1, relaxed);
}
return x;
}
Optional<T> steal() noexcept
{
u64 b = _bottom.load(acquire);
std::atomic_thread_fence(seq_cst);
u64 t = _top.load(acquire);
Optional<T> x {};
if (b < t)
{
Array * array = _array.load(consume);
x = array->get(b);
if (!_bottom.compare_exchange_strong(b, b + 1, seq_cst, relaxed))
{
return {Optional<T>::abort};
}
}
return x;
}
};
}
namespace riften::detail {
template <typename T>
using Deque = typon::rt::StealingDeque<T>;
}
#endif // TYPON_DEQUE_HPP_INCLUDED
#ifndef TYPON_EVENTCOUNT_HPP_INCLUDED
#define TYPON_EVENTCOUNT_HPP_INCLUDED
#include <atomic>
#include <cstdint>
namespace typon::rt
{
template <unsigned int N = 10>
struct EventCount
{
using u64 = std::uint_fast64_t;
static_assert(N < 16);
static constexpr u64 shift = u64(1) << N;
static constexpr u64 mask = shift - 1;
std::atomic<u64> _state {0};
auto prepare_wait() noexcept
{
return _state.fetch_add(1, std::memory_order_acq_rel);
}
void cancel_wait() noexcept
{
_state.fetch_sub(1, std::memory_order_seq_cst);
}
void wait(u64 state) noexcept
{
for(;;)
{
_state.wait(state, std::memory_order_acquire);
auto newstate = _state.load(std::memory_order_acquire);
if ((newstate ^ state) & (~mask))
{
break;
}
state = newstate;
}
_state.fetch_sub(1, std::memory_order_seq_cst);
}
void notify_one() noexcept
{
_state.fetch_add(shift, std::memory_order_acq_rel);
_state.notify_one();
}
void notify_all() noexcept
{
_state.fetch_add(shift, std::memory_order_acq_rel);
_state.notify_all();
}
};
}
#endif // TYPON_EVENTCOUNT_HPP_INCLUDED
#ifndef TYPON_FORK_HPP_INCLUDED
#define TYPON_FORK_HPP_INCLUDED
#include <coroutine>
#include <cstdint>
#include <continuation.hpp>
#include <future.hpp>
#include <result.hpp>
#include <scheduler.hpp>
namespace typon::rt
{
template <typename T = void>
struct [[nodiscard]] Fork
{
struct promise_type;
using u64 = continuation_data::u64;
std::coroutine_handle<promise_type> _coroutine;
Fork(std::coroutine_handle<promise_type> coroutine) noexcept : _coroutine(coroutine) {}
Fork(const Fork &) = delete;
Fork& operator=(const Fork &) = delete;
Fork(Fork &&) noexcept = default;
Fork& operator=(Fork &&) noexcept = default;
struct promise_type : Result<T>
{
continuation_handle _continuation;
coroutine_node _node;
Fork get_return_object() noexcept
{
return { std::coroutine_handle<promise_type>::from_promise(*this) };
}
std::suspend_always initial_suspend() noexcept
{
return {};
}
auto final_suspend() noexcept
{
struct awaitable : std::suspend_always
{
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{
if (Optional continuation = Scheduler::pop())
{
return *continuation;
}
auto & continuation = coroutine.promise()._continuation;
u64 n = continuation.n().fetch_sub(1, std::memory_order_acq_rel);
if (n == 1)
{
return continuation.continuation();
}
return std::noop_coroutine();
}
};
return awaitable {};
}
};
struct awaitable
{
std::coroutine_handle<promise_type> _coroutine;
u64 _thefts;
awaitable(std::coroutine_handle<promise_type> coroutine) noexcept
: _coroutine(coroutine)
{}
bool await_ready() noexcept
{
return false;
}
template <typename Promise>
auto await_suspend(std::coroutine_handle<Promise> continuation)
{
_coroutine.promise()._continuation = continuation;
_thefts = _coroutine.promise()._continuation.thefts();
std::coroutine_handle<> on_stack_handle = _coroutine;
Scheduler::push(continuation);
return on_stack_handle;
}
auto await_resume()
{
auto continuation = _coroutine.promise()._continuation;
bool stolen = continuation.thefts() > _thefts;
if (stolen)
{
continuation.children().insert(_coroutine);
}
return Future<promise_type>(_coroutine, !stolen);
}
};
auto operator co_await() &&
{
return awaitable { _coroutine };
}
};
}
#endif // TYPON_FORK_HPP_INCLUDED
#ifndef TYPON_FUTURE_HPP_INCLUDED
#define TYPON_FUTURE_HPP_INCLUDED
#include <coroutine>
#include <memory>
#include <type_traits>
#include <utility>
#include <result.hpp>
namespace typon::rt
{
template <typename Promise>
using value_type = typename Promise::value_type;
template <typename P>
concept Void = std::is_void_v<value_type<P>>;
template <typename P>
concept Reference = std::is_reference_v<value_type<P>>;
template <typename P>
concept Complete = !Void<P> && !Reference<P>;
template <typename P>
concept Small = sizeof(value_type<P>) <= 2 * sizeof(void*);
template <typename P>
concept TriviallyCopyable = std::is_trivially_copyable_v<value_type<P>>;
struct CoroutineGuard
{
std::coroutine_handle<> _coroutine;
~CoroutineGuard()
{
_coroutine.destroy();
}
};
template <typename Promise>
struct Future
{
template <typename T>
struct always_false
{
static constexpr bool value { false };
};
static_assert(always_false<Promise>::value, "Unexpected Promise type for Future");
};
template <typename Promise>
requires Complete<Promise> && (!Small<Promise>)
struct Future<Promise>
{
std::coroutine_handle<Promise> _coroutine;
bool _owning;
Future(std::coroutine_handle<Promise> coroutine, bool ready)
{
_coroutine = coroutine;
_owning = ready;
if (ready)
{
if (coroutine.promise()._exception)
{
std::rethrow_exception(coroutine.promise()._exception);
}
}
}
Future(const Future &) = delete;
Future& operator=(const Future &) = delete;
Future(Future && other) noexcept
: _coroutine(other._coroutine)
, _owning(std::exchange(other._owning, false))
{}
Future& operator=(Future && other) noexcept
{
std::swap(_owning, other._owning);
std::swap(_coroutine, other._coroutine);
return *this;
}
~Future()
{
if (_owning)
{
_coroutine.destroy();
}
}
auto get()
{
return _coroutine.promise().get();
}
};
template <typename Promise>
requires Complete<Promise> && Small<Promise> && (!TriviallyCopyable<Promise>)
struct Future<Promise>
{
using value_type = typename Promise::value_type;
std::coroutine_handle<Promise> _coroutine;
union
{
value_type _result;
};
Future(std::coroutine_handle<Promise> coroutine, bool ready)
{
if (ready)
{
CoroutineGuard guard { coroutine };
std::construct_at(&(_result), coroutine.promise().get());
}
else
{
_coroutine = coroutine;
}
}
Future(Future && other)
noexcept(std::is_nothrow_move_constructible_v<value_type>)
{
_coroutine = other._coroutine;
if (!_coroutine)
{
std::construct_at(std::addressof(_result), std::move(other._result));
}
}
Future& operator=(Future && other)
noexcept(std::is_nothrow_move_constructible_v<value_type>)
{
if (this != &other)
{
Future old { std::move(*this) };
_coroutine = other._coroutine;
if (!_coroutine)
{
std::construct_at(std::addressof(_result), std::move(other._result));
}
}
return *this;
}
~Future()
{
if (!_coroutine)
{
std::destroy_at(&(_result));
}
}
auto get()
{
if (_coroutine)
{
return _coroutine.promise().get();
}
return _result;
}
};
template <typename Promise>
requires Complete<Promise> && Small<Promise> && TriviallyCopyable<Promise>
struct Future<Promise>
{
using value_type = typename Promise::value_type;
std::coroutine_handle<Promise> _coroutine;
union
{
value_type _result;
};
Future(std::coroutine_handle<Promise> coroutine, bool ready)
{
if (ready)
{
CoroutineGuard guard { coroutine };
std::construct_at(&(_result), coroutine.promise().get());
}
else
{
_coroutine = coroutine;
}
}
~Future()
{
if (!_coroutine)
{
std::destroy_at(&(_result));
}
}
auto get()
{
if (_coroutine)
{
return _coroutine.promise().get();
}
return _result;
}
};
template <Reference Promise>
struct Future<Promise>
{
using value_type = typename Promise::value_type;
std::coroutine_handle<Promise> _coroutine;
std::remove_reference<value_type> * _result;
Future(std::coroutine_handle<Promise> coroutine, bool ready)
{
if (ready)
{
CoroutineGuard guard { coroutine };
_result = std::addressof(coroutine.promise().get());
}
else
{
_coroutine = coroutine;
}
}
value_type get()
{
if (_coroutine)
{
return _coroutine.promise().get();
}
return *_result;
}
};
template <Void Promise>
struct Future<Promise>
{
using value_type = typename Promise::value_type;
std::coroutine_handle<Promise> _coroutine;
Future(std::coroutine_handle<Promise> coroutine, bool ready)
{
if (ready)
{
CoroutineGuard guard { coroutine };
coroutine.promise().get();
}
else
{
_coroutine = coroutine;
}
}
void get()
{
if (_coroutine)
{
_coroutine.promise().get();
}
}
};
}
#endif // TYPON_FUTURE_HPP_INCLUDED
#ifndef TYPON_JOIN_HPP_INCLUDED
#define TYPON_JOIN_HPP_INCLUDED
#include <coroutine>
#include <utility>
#include <continuation.hpp>
#include <result.hpp>
namespace typon::rt
{
struct [[nodiscard]] Sync {};
template <typename T = void>
struct [[nodiscard]] Join
{
struct promise_type;
std::coroutine_handle<promise_type> _coroutine;
Join(std::coroutine_handle<promise_type> coroutine) noexcept : _coroutine(coroutine) {}
Join(const Join &) = delete;
Join & operator=(const Join &) = delete;
Join(Join && other) noexcept
: _coroutine(std::exchange(other._coroutine, nullptr))
{}
Join & operator=(Join other)
{
std::swap(_coroutine, other._coroutine);
return *this;
}
~Join()
{
if (_coroutine)
{
_coroutine.promise()._data._children.clear();
_coroutine.destroy();
}
}
struct promise_type : Result<T>
{
using u64 = continuation_data::u64;
static constexpr u64 UMAX = continuation_data::UMAX;
continuation_data _data;
promise_type() noexcept
: _data(std::coroutine_handle<promise_type>::from_promise(*this))
{}
Join get_return_object() noexcept
{
return { std::coroutine_handle<promise_type>::from_promise(*this) };
}
std::suspend_always initial_suspend() noexcept
{
return {};
}
template <typename U>
decltype(auto) await_transform(U && expr) noexcept
{
return std::forward<U>(expr);
}
auto await_transform(Sync &&) noexcept
{
struct awaitable
{
continuation_data & _data;
bool await_ready() noexcept
{
if (u64 thefts = _data._thefts)
{
u64 n = _data._n.load(std::memory_order_acquire);
if (n - (UMAX - thefts) == 0)
{
return true;
}
return false;
}
return true;
}
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{
u64 thefts = _data._thefts;
u64 n = _data._n.fetch_sub(UMAX - thefts, std::memory_order_acq_rel);
if (n - (UMAX - thefts) == 0)
{
return coroutine;
}
return std::noop_coroutine();
}
void await_resume() noexcept
{
_data._thefts = 0;
_data._n.store(UMAX, std::memory_order_release);
}
};
return awaitable { _data };
}
auto final_suspend() noexcept
{
struct awaitable : std::suspend_always
{
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{
continuation_data & data = coroutine.promise()._data;
u64 thefts = data._thefts;
u64 n = data._n.fetch_sub(UMAX - thefts, std::memory_order_acq_rel);
if (n - (UMAX - thefts) == 0)
{
return data._continuation;
}
return std::noop_coroutine();
}
};
return awaitable {};
}
};
auto operator co_await() && noexcept
{
struct awaitable
{
std::coroutine_handle<promise_type> _coroutine;
bool await_ready() noexcept
{
return false;
}
std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation) noexcept
{
_coroutine.promise()._data._continuation = continuation;
return _coroutine;
}
decltype(auto) await_resume()
{
_coroutine.promise()._data._children.check_exceptions();
return _coroutine.promise().get();
}
};
return awaitable { _coroutine };
}
};
}
#endif // TYPON_JOIN_HPP_INCLUDED
#ifndef TYPOn_RANDOM_HPP_INCLUDED
#define TYPOn_RANDOM_HPP_INCLUDED
#include <random>
namespace typon::rt
{
struct Random
{
std::mt19937 _rng;
Random() noexcept : _rng{std::random_device{}()} {}
unsigned int operator()() noexcept
{
return _rng();
}
};
namespace random
{
static thread_local Random random;
}
}
#endif // TYPOn_RANDOM_HPP_INCLUDED
#ifndef TYPON_RESULT_HPP_INCLUDED
#define TYPON_RESULT_HPP_INCLUDED
#include <exception>
#include <memory>
#include <type_traits>
#include <utility>
namespace typon
{
template <typename T>
struct Result
{
using value_type = T;
bool _valid = false;
std::exception_ptr _exception;
union
{
T _value;
};
~Result()
{
if (_valid)
{
std::destroy_at(std::addressof(_value));
}
}
template <typename U>
void return_value(U&& expr) noexcept(std::is_nothrow_constructible_v<T, U&&>)
{
std::construct_at(std::addressof(_value), std::forward<U>(expr));
_valid = true;
}
void unhandled_exception() noexcept
{
_exception = std::current_exception();
}
T& get() &
{
if (_exception)
{
std::rethrow_exception(std::exchange(_exception, nullptr));
}
return _value;
}
T&& get() &&
{
if (_exception)
{
std::rethrow_exception(std::exchange(_exception, nullptr));
}
return std::move(_value);
}
};
template <typename T>
struct Result<T&>
{
using value_type = T&;
T* _value;
std::exception_ptr _exception;
void return_value(T& expr) noexcept
{
_value = std::addressof(expr);
}
void unhandled_exception() noexcept
{
_exception = std::current_exception();
}
T& get() &
{
if (_exception)
{
std::rethrow_exception(std::exchange(_exception, nullptr));
}
return *_value;
}
};
template <>
struct Result<void>
{
using value_type = void;
std::exception_ptr _exception;
void return_void() noexcept {}
void unhandled_exception() noexcept
{
_exception = std::current_exception();
}
void get()
{
if (_exception)
{
std::rethrow_exception(std::exchange(_exception, nullptr));
}
}
};
}
#endif // TYPON_RESULT_HPP_INCLUDED
#ifndef TYPON_ROOT_HPP_INCLUDED
#define TYPON_ROOT_HPP_INCLUDED
#include <atomic>
#include <coroutine>
#include <cstdint>
//
#include <continuation.hpp>
#include <result.hpp>
#include <scheduler.hpp>
namespace typon::rt
{
struct [[nodiscard]] Root
{
struct promise_type;
std::coroutine_handle<promise_type> _coroutine;
~Root()
{
_coroutine.destroy();
}
struct promise_type : Result<void>
{
// std::atomic_bool _done = false;
continuation_data _data;
promise_type() noexcept
: _data(std::coroutine_handle<promise_type>::from_promise(*this))
{}
Root get_return_object() noexcept
{
return { std::coroutine_handle<promise_type>::from_promise(*this) };
}
std::suspend_always initial_suspend() noexcept
{
return {};
}
auto final_suspend() noexcept
{
struct awaitable : std::suspend_always
{
void await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{
// !! bad racy
coroutine.promise()._data._n.store(0, std::memory_order_release);
coroutine.promise()._data._n.notify_one();
}
};
return awaitable {};
}
};
void call() &&
{
Scheduler::get();
Scheduler::schedule(_coroutine);
_coroutine.promise()._data._n.wait(continuation_data::UMAX, std::memory_order_acquire);
}
};
}
#endif // TYPON_ROOT_HPP_INCLUDED
#ifndef TYPON_SCHEDULER_HPP_INCLUDED
#define TYPON_SCHEDULER_HPP_INCLUDED
#include <atomic>
#include <cstdint>
#include <exception>
#include <thread>
#include <vector>
#include <continuation.hpp>
#include <eventcount.hpp>
#include <deque.hpp>
#include <random.hpp>
namespace typon::rt
{
struct Scheduler
{
using uint = unsigned int;
using Task = Optional<continuation_handle>;
using Deque = StealingDeque<continuation_handle>;
static inline thread_local uint thread_id;
std::atomic<uint> _actives = 0;
std::atomic<uint> _thieves = 0;
std::vector<Deque> _deque;
std::vector<std::thread> _thread;
std::atomic_bool _done {false};
EventCount<> _notifyer;
const uint _parallelism;
static Scheduler & get() noexcept
{
static Scheduler scheduler {std::thread::hardware_concurrency()};
return scheduler;
}
static void push(continuation_handle task) noexcept
{
get()._deque[thread_id].push(task);
}
static void schedule(continuation_handle task) noexcept
{
Scheduler & scheduler = get();
scheduler._deque[thread_id].push(task);
scheduler._notifyer.notify_one();
}
static Task pop() noexcept
{
return get()._deque[thread_id].pop();
}
Scheduler(uint parallelism) noexcept
: _deque(parallelism + 1)
, _parallelism(parallelism)
{
thread_id = parallelism;
for (uint id = 0; id < parallelism; id++)
{
_thread.emplace_back([this, id]() {
thread_id = id;
Task task {};
for(;;)
{
exploit_task(task);
if (!wait_for_task(task))
{
break;
}
}
});
}
}
~Scheduler() noexcept
{
_done.store(true);
_notifyer.notify_all();
for (auto & t : _thread)
{
t.join();
}
}
void exploit_task(Task & task) noexcept
{
if (task)
{
if (_actives.fetch_add(1) == 0)
{
if (_thieves.load() == 0)
{
_notifyer.notify_one();
}
}
while(task)
{
task->resume();
task = _deque[thread_id].pop();
if (task)
{
printf("[%u] ERROR unexpectedly pop valid task %p\n", thread_id, task->_data);
}
}
_actives.fetch_sub(1);
}
}
void explore_task(Task & task) noexcept
{
for (uint i = 0; i < _parallelism * 2 + 1; i++)
{
uint id = random::random() % _parallelism;
if (id == thread_id)
{
task = _deque.back().steal();
}
else
{
task = _deque[id].steal();
}
if (task)
{
task->thefts()++;
break;
}
}
}
bool wait_for_task(Task & task) noexcept
{
wait_for_task:
_thieves.fetch_add(1);
explore_task:
explore_task(task);
if (task)
{
if (_thieves.fetch_sub(1) == 1)
{
_notifyer.notify_one();
}
return true;
}
auto key = _notifyer.prepare_wait();
task = _deque.back().steal();
if (task.state() != Task::empty)
{
_notifyer.cancel_wait();
if (task)
{
if (_thieves.fetch_sub(1) == 1)
{
_notifyer.notify_one();
}
return true;
}
goto explore_task;
}
if (_done.load())
{
_notifyer.cancel_wait();
_notifyer.notify_all();
_thieves.fetch_sub(1);
return false;
}
if (_thieves.fetch_sub(1) == 1)
{
if (_actives.load() > 0)
{
_notifyer.cancel_wait();
goto wait_for_task;
}
}
_notifyer.wait(key);
return true;
}
};
}
#endif // TYPON_SCHEDULER_HPP_INCLUDED
#ifndef TYPON_TASK_HPP_INCLUDED
#define TYPON_TASK_HPP_INCLUDED
#include <coroutine>
#include <utility>
#include <result.hpp>
namespace typon::rt
{
template <typename T = void>
struct [[nodiscard]] Task
{
struct promise_type;
std::coroutine_handle<promise_type> _coroutine;
Task(std::coroutine_handle<promise_type> coroutine) noexcept : _coroutine(coroutine) {}
Task(const Task &) = delete;
Task & operator=(const Task &) = delete;
Task(Task && other) noexcept
: _coroutine(std::exchange(other._coroutine, nullptr))
{}
Task & operator=(Task other)
{
std::swap(_coroutine, other._coroutine);
return *this;
}
~Task()
{
if (_coroutine)
{
_coroutine.destroy();
}
}
struct promise_type : Result<T>
{
std::coroutine_handle<> _continuation;
Task get_return_object() noexcept
{
return { std::coroutine_handle<promise_type>::from_promise(*this) };
}
std::suspend_always initial_suspend() noexcept
{
return {};
}
auto final_suspend() noexcept
{
struct awaitable : std::suspend_always
{
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{
return coroutine.promise()._continuation;
}
};
return awaitable {};
}
};
auto operator co_await() && noexcept
{
struct awaitable
{
std::coroutine_handle<promise_type> _coroutine;
bool await_ready() noexcept
{
return false;
}
std::coroutine_handle<> await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept
{
_coroutine.promise()._continuation = awaiting_coroutine;
return _coroutine;
}
decltype(auto) await_resume()
{
return _coroutine.promise().get();
}
};
return awaitable { _coroutine };
}
auto discard() && noexcept
{
struct awaitable : std::suspend_always
{
std::coroutine_handle<promise_type> _coroutine;
std::coroutine_handle<> await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept
{
_coroutine.promise()._continuation = awaiting_coroutine;
return _coroutine;
}
};
return awaitable { {}, _coroutine };
}
decltype(auto) call() &&
{
_coroutine.promise()._continuation = std::noop_coroutine();
_coroutine.resume();
return _coroutine.promise().get();
}
void call_discard() &&
{
_coroutine.promise()._continuation = std::noop_coroutine();
_coroutine.resume();
}
};
}
#endif // TYPON_TASK_HPP_INCLUDED
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment