Commit 7ee68987 authored by Xavier Thompson's avatar Xavier Thompson

Refactor continuation.hpp introducing parent.hpp

parent 13f30eb0
#ifndef TYPON_CORE_CONTINUATION_HPP_INCLUDED #ifndef TYPON_CORE_CONTINUATION_HPP_INCLUDED
#define TYPON_CORE_CONTINUATION_HPP_INCLUDED #define TYPON_CORE_CONTINUATION_HPP_INCLUDED
#include <atomic>
#include <concepts>
#include <coroutine> #include <coroutine>
#include <cstddef>
#include <cstdint> #include <cstdint>
#include <exception>
#include <limits>
namespace typon namespace typon
{ {
struct ForkList
{
struct Node
{
Node * _next;
std::coroutine_handle<> _coroutine;
std::exception_ptr * _exception;
};
Node * _first = nullptr;
template <typename Promise>
requires requires (Promise p)
{
{ p._exception } -> std::same_as<std::exception_ptr&>;
{ p._node } -> std::same_as<Node&>;
}
void insert(std::coroutine_handle<Promise> coroutine) noexcept
{
std::exception_ptr * exception = &(coroutine.promise()._exception);
coroutine.promise()._node = { _first, coroutine, exception };
_first = &(coroutine.promise()._node);
}
void check_exceptions()
{
for (auto node = _first; node != nullptr; node = node->_next)
{
std::exception_ptr & exception = *(node->_exception);
if (exception)
{
std::rethrow_exception(exception);
}
}
}
void clear() noexcept
{
auto next = _first;
while(next)
{
auto node = next;
next = next->_next;
node->_coroutine.destroy();
}
_first = nullptr;
}
};
struct Continuation struct Continuation
{
struct Data
{ {
using u64 = std::uint_fast64_t; using u64 = std::uint_fast64_t;
static constexpr u64 UMAX = std::numeric_limits<u64>::max();
std::coroutine_handle<> _coroutine; std::coroutine_handle<> _coroutine;
std::coroutine_handle<> _continuation;
ForkList _children;
std::atomic<u64> _n = UMAX;
u64 _thefts = 0; u64 _thefts = 0;
Data(std::coroutine_handle<> coroutine) noexcept Continuation(std::coroutine_handle<> coroutine) noexcept
: _coroutine(coroutine) : _coroutine(coroutine)
{} {}
}; };
Data * _data;
Continuation() noexcept {}
Continuation(std::nullptr_t null) noexcept : _data(null) {}
template <typename Promise>
Continuation(std::coroutine_handle<Promise> coroutine) noexcept
: _data(&(coroutine.promise()._data))
{}
auto & thefts() noexcept
{
return _data->_thefts;
}
auto & children() noexcept
{
return _data->_children;
}
auto & n() noexcept
{
return _data->_n;
}
void resume()
{
_data->_coroutine.resume();
}
operator bool() noexcept
{
return _data;
}
operator std::coroutine_handle<>() noexcept
{
return _data->_coroutine;
}
std::coroutine_handle<> continuation() noexcept
{
if (_data->_coroutine.done())
{
return _data->_continuation;
}
return _data->_coroutine;
}
};
} }
......
...@@ -4,8 +4,8 @@ ...@@ -4,8 +4,8 @@
#include <coroutine> #include <coroutine>
#include <cstdint> #include <cstdint>
#include <typon/core/continuation.hpp>
#include <typon/core/forked.hpp> #include <typon/core/forked.hpp>
#include <typon/core/parent.hpp>
#include <typon/core/result.hpp> #include <typon/core/result.hpp>
#include <typon/core/scheduler.hpp> #include <typon/core/scheduler.hpp>
...@@ -17,7 +17,7 @@ namespace typon ...@@ -17,7 +17,7 @@ namespace typon
struct [[nodiscard]] Fork struct [[nodiscard]] Fork
{ {
struct promise_type; struct promise_type;
using u64 = Continuation::Data::u64; using u64 = Parent::u64;
std::coroutine_handle<promise_type> _coroutine; std::coroutine_handle<promise_type> _coroutine;
...@@ -33,7 +33,7 @@ namespace typon ...@@ -33,7 +33,7 @@ namespace typon
struct promise_type : Result<T> struct promise_type : Result<T>
{ {
Continuation _continuation; Parent * _parent;
ForkList::Node _node; ForkList::Node _node;
Fork get_return_object() noexcept Fork get_return_object() noexcept
...@@ -52,15 +52,15 @@ namespace typon ...@@ -52,15 +52,15 @@ namespace typon
{ {
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{ {
auto continuation = coroutine.promise()._continuation; auto parent = coroutine.promise()._parent;
if (Scheduler::pop()) if (Scheduler::pop())
{ {
return continuation; return parent->_coroutine;
} }
u64 n = continuation.n().fetch_sub(1, std::memory_order_acq_rel); u64 n = parent->_n.fetch_sub(1, std::memory_order_acq_rel);
if (n == 1) if (n == 1)
{ {
return continuation.continuation(); return parent->continuation();
} }
return std::noop_coroutine(); return std::noop_coroutine();
} }
...@@ -87,21 +87,22 @@ namespace typon ...@@ -87,21 +87,22 @@ namespace typon
template <typename Promise> template <typename Promise>
auto await_suspend(std::coroutine_handle<Promise> continuation) noexcept auto await_suspend(std::coroutine_handle<Promise> continuation) noexcept
{ {
_coroutine.promise()._continuation = continuation; Parent * parent = &(continuation.promise()._data);
_thefts = _coroutine.promise()._continuation.thefts(); _coroutine.promise()._parent = parent;
_thefts = parent->_thefts;
std::coroutine_handle<> on_stack_handle = _coroutine; std::coroutine_handle<> on_stack_handle = _coroutine;
Scheduler::push(continuation); Scheduler::push(parent);
return on_stack_handle; return on_stack_handle;
} }
auto await_resume() auto await_resume()
{ {
auto continuation = _coroutine.promise()._continuation; auto parent = _coroutine.promise()._parent;
bool stolen = continuation.thefts() > _thefts; bool stolen = parent->_thefts > _thefts;
if (stolen) if (stolen)
{ {
continuation.children().insert(_coroutine); parent->_children.insert(_coroutine);
} }
return Forked<T>(_coroutine, !stolen); return Forked<T>(_coroutine, !stolen);
} }
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
#include <coroutine> #include <coroutine>
#include <utility> #include <utility>
#include <typon/core/continuation.hpp> #include <typon/core/parent.hpp>
#include <typon/core/result.hpp> #include <typon/core/result.hpp>
...@@ -47,10 +47,10 @@ namespace typon ...@@ -47,10 +47,10 @@ namespace typon
struct promise_type : Result<T> struct promise_type : Result<T>
{ {
using u64 = Continuation::Data::u64; using u64 = Parent::u64;
static constexpr u64 UMAX = Continuation::Data::UMAX; static constexpr u64 UMAX = Parent::UMAX;
Continuation::Data _data; Parent _data;
promise_type() noexcept promise_type() noexcept
: _data(std::coroutine_handle<promise_type>::from_promise(*this)) : _data(std::coroutine_handle<promise_type>::from_promise(*this))
...@@ -76,7 +76,7 @@ namespace typon ...@@ -76,7 +76,7 @@ namespace typon
{ {
struct awaitable struct awaitable
{ {
Continuation::Data & _data; Parent & _data;
bool await_ready() noexcept bool await_ready() noexcept
{ {
...@@ -120,7 +120,7 @@ namespace typon ...@@ -120,7 +120,7 @@ namespace typon
{ {
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{ {
Continuation::Data & data = coroutine.promise()._data; Parent & data = coroutine.promise()._data;
u64 thefts = data._thefts; u64 thefts = data._thefts;
if (thefts == 0) if (thefts == 0)
{ {
......
#ifndef TYPON_CORE_PARENT_HPP_INCLUDED
#define TYPON_CORE_PARENT_HPP_INCLUDED
#include <atomic>
#include <concepts>
#include <coroutine>
#include <cstdint>
#include <exception>
#include <limits>
#include <vector>
#include <typon/core/continuation.hpp>
namespace typon
{
struct ForkList
{
struct Node
{
Node * _next;
std::coroutine_handle<> _coroutine;
std::exception_ptr * _exception;
};
Node * _first = nullptr;
template <typename Promise>
requires requires (Promise p)
{
{ p._exception } -> std::same_as<std::exception_ptr&>;
{ p._node } -> std::same_as<Node&>;
}
void insert(std::coroutine_handle<Promise> coroutine) noexcept
{
std::exception_ptr * exception = &(coroutine.promise()._exception);
coroutine.promise()._node = { _first, coroutine, exception };
_first = &(coroutine.promise()._node);
}
void check_exceptions()
{
for (auto node = _first; node != nullptr; node = node->_next)
{
std::exception_ptr & exception = *(node->_exception);
if (exception)
{
std::rethrow_exception(exception);
}
}
}
void clear() noexcept
{
auto next = _first;
while(next)
{
auto node = next;
next = next->_next;
node->_coroutine.destroy();
}
_first = nullptr;
}
};
struct Parent : Continuation
{
using u64 = Continuation::u64;
static constexpr u64 UMAX = std::numeric_limits<u64>::max();
std::coroutine_handle<> _continuation;
ForkList _children;
std::atomic<u64> _n = UMAX;
Parent(std::coroutine_handle<> coroutine) noexcept
: Continuation(coroutine)
{}
void resume()
{
_coroutine.resume();
}
operator std::coroutine_handle<>() noexcept
{
return _coroutine;
}
std::coroutine_handle<> continuation() noexcept
{
if (_coroutine.done())
{
return _continuation;
}
return _coroutine;
}
};
}
#endif // TYPON_CORE_PARENT_HPP_INCLUDED
...@@ -59,7 +59,7 @@ namespace typon ...@@ -59,7 +59,7 @@ namespace typon
get()._notifyer.notify_one(); get()._notifyer.notify_one();
} }
static void push(Continuation task) noexcept static void push(Continuation * task) noexcept
{ {
get()._stack[thread_id]->push(task); get()._stack[thread_id]->push(task);
} }
...@@ -179,8 +179,8 @@ namespace typon ...@@ -179,8 +179,8 @@ namespace typon
{ {
if (auto task = stack->steal()) if (auto task = stack->steal())
{ {
task.thefts()++; task->_thefts++;
coroutine = task; coroutine = task->_coroutine;
} }
return; return;
} }
...@@ -188,8 +188,8 @@ namespace typon ...@@ -188,8 +188,8 @@ namespace typon
{ {
if (auto task = stack->pop_top()) if (auto task = stack->pop_top())
{ {
task.thefts()++; task->_thefts++;
coroutine = task; coroutine = task->_coroutine;
return; return;
} }
if (stack->_state.compare_exchange_strong(state, Stack::EMPTY)) if (stack->_state.compare_exchange_strong(state, Stack::EMPTY))
......
...@@ -9,13 +9,15 @@ ...@@ -9,13 +9,15 @@
#include <typon/fundamental/ring_buffer.hpp> #include <typon/fundamental/ring_buffer.hpp>
#include <typon/core/continuation.hpp>
namespace typon namespace typon
{ {
struct Stack struct Stack
{ {
using ring_buffer = fdt::lock_free::ring_buffer<Continuation>; using ring_buffer = fdt::lock_free::ring_buffer<Continuation *>;
using u64 = ring_buffer::u64; using u64 = ring_buffer::u64;
using enum std::memory_order; using enum std::memory_order;
...@@ -38,7 +40,7 @@ namespace typon ...@@ -38,7 +40,7 @@ namespace typon
delete _buffer.load(relaxed); delete _buffer.load(relaxed);
} }
void push(Continuation x) noexcept void push(Continuation * x) noexcept
{ {
u64 bottom = _bottom.load(relaxed); u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
...@@ -73,7 +75,7 @@ namespace typon ...@@ -73,7 +75,7 @@ namespace typon
return false; return false;
} }
Continuation steal() noexcept Continuation * steal() noexcept
{ {
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
std::atomic_thread_fence(seq_cst); std::atomic_thread_fence(seq_cst);
...@@ -81,22 +83,22 @@ namespace typon ...@@ -81,22 +83,22 @@ namespace typon
ring_buffer * buffer = _buffer.load(consume); ring_buffer * buffer = _buffer.load(consume);
if (top < bottom) if (top < bottom)
{ {
Continuation x = buffer->get(top); Continuation * x = buffer->get(top);
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{ {
return { nullptr }; return nullptr;
} }
return x; return x;
} }
return { nullptr }; return nullptr;
} }
Continuation pop_top() noexcept Continuation * pop_top() noexcept
{ {
u64 top = _top.load(relaxed); u64 top = _top.load(relaxed);
u64 bottom = _bottom.load(relaxed); u64 bottom = _bottom.load(relaxed);
auto buffer = _buffer.load(relaxed); auto buffer = _buffer.load(relaxed);
Continuation x { nullptr }; Continuation * x = nullptr;
if (top < bottom) if (top < bottom)
{ {
x = buffer->get(top); x = buffer->get(top);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment