Commit ff8fe1f5 authored by Xavier Thompson's avatar Xavier Thompson

WIP: Add gc.hpp

parent 2b594b49
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include <typon/fundamental/deque.hpp> #include <typon/fundamental/deque.hpp>
#include <typon/fundamental/event_count.hpp> #include <typon/fundamental/event_count.hpp>
#include <typon/fundamental/gc.hpp>
#include <typon/fundamental/optional.hpp> #include <typon/fundamental/optional.hpp>
#include <typon/fundamental/random.hpp> #include <typon/fundamental/random.hpp>
...@@ -23,17 +24,10 @@ namespace typon ...@@ -23,17 +24,10 @@ namespace typon
using uint = unsigned int; using uint = unsigned int;
using Deque = fdt::lock_free::deque<Continuation>; using Deque = fdt::lock_free::deque<Continuation>;
using Task = typename Deque::pop_type; using Task = typename Deque::pop_type;
using GC = fdt::lock_free::gc;
static inline thread_local uint thread_id; static inline thread_local uint thread_id;
std::atomic<uint> _actives = 0;
std::atomic<uint> _thieves = 0;
std::vector<Deque> _deque;
std::vector<std::thread> _thread;
std::atomic_bool _done {false};
fdt::lock_free::event_count<> _notifyer;
const uint _concurrency;
static Scheduler & get() noexcept static Scheduler & get() noexcept
{ {
static Scheduler scheduler {std::thread::hardware_concurrency()}; static Scheduler scheduler {std::thread::hardware_concurrency()};
...@@ -42,24 +36,51 @@ namespace typon ...@@ -42,24 +36,51 @@ namespace typon
static void push(Continuation task) noexcept static void push(Continuation task) noexcept
{ {
get()._deque[thread_id].push(task); Scheduler & scheduler = get();
if (auto garbage = scheduler._deque[thread_id].push(task))
{
scheduler._gc.retire(garbage);
}
} }
static void schedule(Continuation task) noexcept static void schedule(Continuation task) noexcept
{ {
Scheduler & scheduler = get(); Scheduler & scheduler = get();
scheduler._deque[thread_id].push(task); if (auto garbage = scheduler._deque[thread_id].push(task))
{
scheduler._gc.retire(garbage);
}
scheduler._notifyer.notify_one(); scheduler._notifyer.notify_one();
} }
static Task pop() noexcept static Task pop() noexcept
{ {
return get()._deque[thread_id].pop(); Scheduler & scheduler = get();
Deque & deque = scheduler._deque[thread_id];
Task task = deque.pop();
if (task.match(Deque::Compress) || task.match(Deque::Prune))
{
if (auto array = deque.shrink())
{
scheduler._gc.retire(array);
}
}
return task;
} }
std::atomic<uint> _actives = 0;
std::atomic<uint> _thieves = 0;
std::vector<Deque> _deque;
std::vector<std::thread> _thread;
std::atomic_bool _done {false};
fdt::lock_free::event_count<> _notifyer;
const uint _concurrency;
GC _gc;
Scheduler(uint concurrency) noexcept Scheduler(uint concurrency) noexcept
: _deque(concurrency + 1) : _deque(concurrency + 1)
, _concurrency(concurrency) , _concurrency(concurrency)
, _gc(concurrency)
{ {
thread_id = concurrency; thread_id = concurrency;
...@@ -117,6 +138,7 @@ namespace typon ...@@ -117,6 +138,7 @@ namespace typon
void explore_task(Task & task) noexcept void explore_task(Task & task) noexcept
{ {
_gc.enter(thread_id);
for (uint i = 0; i < _concurrency * 2 + 1; i++) for (uint i = 0; i < _concurrency * 2 + 1; i++)
{ {
uint id = fdt::random::random() % _concurrency; uint id = fdt::random::random() % _concurrency;
...@@ -134,6 +156,7 @@ namespace typon ...@@ -134,6 +156,7 @@ namespace typon
break; break;
} }
} }
_gc.leave(thread_id);
} }
bool wait_for_task(Task & task) noexcept bool wait_for_task(Task & task) noexcept
......
...@@ -19,13 +19,15 @@ namespace typon::fdt::lock_free ...@@ -19,13 +19,15 @@ namespace typon::fdt::lock_free
struct deque struct deque
{ {
using array_type = ring_buffer<T>; using array_type = ring_buffer<T>;
using pop_type = fdt::optional<T, 2>; using pop_type = fdt::optional<T>;
using u8 = typename ring_buffer<T>::u8; using u8 = typename ring_buffer<T>::u8;
using u64 = typename ring_buffer<T>::u64; using u64 = typename ring_buffer<T>::u64;
static constexpr typename pop_type::template state<0> Empty {}; static constexpr typename pop_type::template state<0> Empty {};
static constexpr typename pop_type::template state<1> Abort {}; static constexpr typename pop_type::template state<2> Abort {};
static constexpr typename pop_type::template state<4> Compress {};
static constexpr typename pop_type::template state<3> Prune {};
using enum std::memory_order; using enum std::memory_order;
...@@ -42,19 +44,22 @@ namespace typon::fdt::lock_free ...@@ -42,19 +44,22 @@ namespace typon::fdt::lock_free
delete _array.load(relaxed); delete _array.load(relaxed);
} }
void push(T x) noexcept auto push(T x) noexcept
{ {
u64 bottom = _bottom.load(relaxed); u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
array_type * array = _array.load(relaxed); array_type * array = _array.load(relaxed);
array_type * garbage = nullptr;
if (bottom - top > array->capacity() - 1) if (bottom - top > array->capacity() - 1)
{ {
garbage = array;
array = array->grow(top, bottom); array = array->grow(top, bottom);
_array.store(array); _array.store(array);
} }
array->put(bottom, x); array->put(bottom, x);
std::atomic_thread_fence(release); std::atomic_thread_fence(release);
_bottom.store(bottom + 1, relaxed); _bottom.store(bottom + 1, relaxed);
return garbage;
} }
pop_type pop() noexcept pop_type pop() noexcept
...@@ -64,7 +69,16 @@ namespace typon::fdt::lock_free ...@@ -64,7 +69,16 @@ namespace typon::fdt::lock_free
_bottom.store(bottom, relaxed); _bottom.store(bottom, relaxed);
std::atomic_thread_fence(seq_cst); std::atomic_thread_fence(seq_cst);
u64 top = _top.load(relaxed); u64 top = _top.load(relaxed);
pop_type x { Empty }; u64 capacity = array->capacity();
pop_type x;
if (capacity > 16)
{
x = { Compress };
}
else
{
x = { Empty };
}
if (top <= bottom) if (top <= bottom)
{ {
x = array->get(bottom); x = array->get(bottom);
...@@ -72,10 +86,24 @@ namespace typon::fdt::lock_free ...@@ -72,10 +86,24 @@ namespace typon::fdt::lock_free
{ {
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{ {
x = { Empty }; if (capacity > 16)
{
x = { Compress };
}
else
{
x = { Empty };
}
} }
_bottom.store(bottom + 1, relaxed); _bottom.store(bottom + 1, relaxed);
} }
if (capacity > 16 && bottom - top > capacity / 4)
{
if (x)
{
x = { *x, Prune };
}
}
} }
else else
{ {
...@@ -84,6 +112,20 @@ namespace typon::fdt::lock_free ...@@ -84,6 +112,20 @@ namespace typon::fdt::lock_free
return x; return x;
} }
array_type * shrink() noexcept
{
u64 bottom = _bottom.load(relaxed);
array_type * array = _array.load(relaxed);
u64 top = _top.load(relaxed);
array_type * next = array->shrink(top, bottom);
if (next)
{
_array.store(next, relaxed);
return array;
}
return nullptr;
}
pop_type steal() noexcept pop_type steal() noexcept
{ {
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
......
#ifndef TYPON_FUNDAMENTAL_GC_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_GC_HPP_INCLUDED
#include <atomic>
#include <bit>
#include <cstdint>
#include <deque>
#include <type_traits>
#include <typon/fundamental/meta.hpp>
#include <typon/fundamental/ring_buffer.hpp>
namespace typon::fdt::lock_free
{
struct gc
{
using u64 = std::uint_fast64_t;
using uint = unsigned int;
using enum std::memory_order;
struct node
{
const u64 _stamp;
std::atomic<node *> _next {nullptr};
node(u64 stamp) noexcept : _stamp(stamp) {};
virtual ~node() {}
};
template <typename T>
struct garbage : node
{
T * _ptr;
garbage(T * ptr, u64 stamp) noexcept
: node(stamp)
, _ptr(ptr)
{}
virtual ~garbage()
{
delete _ptr;
}
};
const uint _concurrency;
std::atomic<u64> * const _stamps;
std::atomic<u64> _stamp {0};
std::atomic<node *> _head {nullptr};
std::atomic<node *> _tail {nullptr};
gc(uint concurrency) noexcept
: _concurrency(concurrency)
, _stamps(new std::atomic<u64>[concurrency])
{
for (uint i = 0; i < _concurrency; i++)
{
_stamps[i].store(u64(-1));
}
}
void enter(uint id) noexcept
{
_stamps[id].store(_stamp.fetch_add(1));
}
template <typename T>
void retire(T * ptr) noexcept
{
auto stamp = _stamp.load();
auto node = new garbage<T> { ptr, stamp };
auto head = _head.exchange(node);
if (head)
{
head->_next.store(node);
}
else
{
_tail.store(node);
}
}
void leave(uint id) noexcept
{
_stamps[id].store(u64(-1));
if (_tail.load())
{
reclaim(oldest());
}
}
u64 oldest() noexcept
{
u64 oldest = u64(-1);
for (uint i = 0; i < _concurrency; i++)
{
u64 stamp = _stamps[i].load(relaxed);
if (stamp < oldest)
{
oldest = stamp;
}
}
return oldest;
}
void reclaim(u64 oldest) noexcept
{
while (auto tail = _tail.load())
{
if (tail->_stamp >= oldest || !tail->_next)
{
break;
}
if (_tail.compare_exchange_strong(tail, tail->_next))
{
delete tail;
}
}
}
};
}
#endif // TYPON_FUNDAMENTAL_GC_HPP_INCLUDED
...@@ -12,6 +12,9 @@ namespace typon::fdt::meta ...@@ -12,6 +12,9 @@ namespace typon::fdt::meta
using type = T<Args...>; using type = T<Args...>;
}; };
template <typename... T>
static constexpr bool always_false_v { false };
} }
......
...@@ -7,16 +7,19 @@ ...@@ -7,16 +7,19 @@
namespace typon::fdt namespace typon::fdt
{ {
template <typename T, unsigned char N> template <typename T>
requires std::is_trivially_copyable_v<T> requires std::is_trivially_copyable_v<T>
struct optional struct optional
{ {
static_assert(N > 0, "N must be greater than 0");
template <unsigned char I> template <unsigned char I>
requires (I <= N)
using state = std::integral_constant<unsigned char, I>; using state = std::integral_constant<unsigned char, I>;
template <unsigned char I>
using empty_state = state<2 * I>;
template <unsigned char I>
using engaged_state = state<2 * I + 1>;
unsigned char _state; unsigned char _state;
union union
{ {
...@@ -26,14 +29,18 @@ namespace typon::fdt ...@@ -26,14 +29,18 @@ namespace typon::fdt
optional() noexcept : _state(0) {} optional() noexcept : _state(0) {}
template <unsigned char I> template <unsigned char I>
requires (I < N) requires (!(I & 1))
optional(state<I> state) noexcept : _state(state) {} optional(state<I> state) noexcept : _state(state) {}
optional(T value) noexcept : _state(N), _value(value) {} optional(T value) noexcept : _state(1), _value(value) {}
template <unsigned char I>
requires (bool(I & 1))
optional(T value, state<I> state) noexcept : _state(state), _value(value) {}
~optional() ~optional()
{ {
if (_state == N) if (_state & 1)
{ {
std::destroy_at(std::addressof(_value)); std::destroy_at(std::addressof(_value));
} }
...@@ -41,7 +48,7 @@ namespace typon::fdt ...@@ -41,7 +48,7 @@ namespace typon::fdt
operator bool() noexcept operator bool() noexcept
{ {
return _state == N; return _state & 1;
} }
template <unsigned char I> template <unsigned char I>
......
...@@ -19,22 +19,16 @@ namespace typon::fdt::lock_free ...@@ -19,22 +19,16 @@ namespace typon::fdt::lock_free
using enum std::memory_order; using enum std::memory_order;
const u64 _mask; const u64 _mask;
ring_buffer * _next;
std::atomic<T> * const _array; std::atomic<T> * const _array;
ring_buffer(u8 bits, ring_buffer * next = nullptr) noexcept ring_buffer(u8 bits) noexcept
: _mask((u64(1) << bits) - 1) : _mask((u64(1) << bits) - 1)
, _next(next)
, _array(new std::atomic<T>[this->capacity()]) , _array(new std::atomic<T>[this->capacity()])
{} {}
~ring_buffer() ~ring_buffer()
{ {
delete [] _array; delete [] _array;
if (_next)
{
delete _next;
}
} }
u64 capacity() noexcept u64 capacity() noexcept
...@@ -63,13 +57,19 @@ namespace typon::fdt::lock_free ...@@ -63,13 +57,19 @@ namespace typon::fdt::lock_free
ring_buffer * grow(u64 start, u64 end) noexcept ring_buffer * grow(u64 start, u64 end) noexcept
{ {
auto buffer = new ring_buffer(std::countr_one(_mask) + 1, this); auto buffer = new ring_buffer(std::countr_one(_mask) + 1);
return fill(buffer, start, end); return fill(buffer, start, end);
} }
ring_buffer * shrink(u64 start, u64 end) noexcept ring_buffer * shrink(u64 start, u64 end) noexcept
{ {
return fill(std::exchange(_next, nullptr), start, end); return nullptr;
u8 bits = std::countr_one(_mask);
if (bits < 3)
{
return nullptr;
}
return fill(new ring_buffer(bits - 1), start, end);
} }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment