Commit 0d4954ae authored by Xavier Thompson's avatar Xavier Thompson

ring_buffer.hpp: hold onto smaller arrays when growing

parent 310aaa0c
...@@ -36,20 +36,13 @@ namespace typon ...@@ -36,20 +36,13 @@ namespace typon
static void push(Continuation task) noexcept static void push(Continuation task) noexcept
{ {
Scheduler & scheduler = get(); get()._deque[thread_id].push(task);
if (auto garbage = scheduler._deque[thread_id].push(task))
{
scheduler._gc.retire(garbage);
}
} }
static void schedule(Continuation task) noexcept static void schedule(Continuation task) noexcept
{ {
Scheduler & scheduler = get(); Scheduler & scheduler = get();
if (auto garbage = scheduler._deque[thread_id].push(task)) scheduler._deque[thread_id].push(task);
{
scheduler._gc.retire(garbage);
}
scheduler._notifyer.notify_one(); scheduler._notifyer.notify_one();
} }
......
...@@ -44,22 +44,19 @@ namespace typon::fdt::lock_free ...@@ -44,22 +44,19 @@ namespace typon::fdt::lock_free
delete _array.load(relaxed); delete _array.load(relaxed);
} }
auto push(T x) noexcept void push(T x) noexcept
{ {
u64 bottom = _bottom.load(relaxed); u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
array_type * array = _array.load(relaxed); array_type * array = _array.load(relaxed);
array_type * garbage = nullptr;
if (bottom - top > array->capacity() - 1) if (bottom - top > array->capacity() - 1)
{ {
garbage = array;
array = array->grow(top, bottom); array = array->grow(top, bottom);
_array.store(array); _array.store(array);
} }
array->put(bottom, x); array->put(bottom, x);
std::atomic_thread_fence(release); std::atomic_thread_fence(release);
_bottom.store(bottom + 1, relaxed); _bottom.store(bottom + 1, relaxed);
return garbage;
} }
pop_type pop() noexcept pop_type pop() noexcept
......
...@@ -19,16 +19,22 @@ namespace typon::fdt::lock_free ...@@ -19,16 +19,22 @@ namespace typon::fdt::lock_free
using enum std::memory_order; using enum std::memory_order;
const u64 _mask; const u64 _mask;
ring_buffer * _next;
std::atomic<T> * const _array; std::atomic<T> * const _array;
ring_buffer(u8 bits) noexcept ring_buffer(u8 bits, ring_buffer * next = nullptr) noexcept
: _mask((u64(1) << bits) - 1) : _mask((u64(1) << bits) - 1)
, _next(next)
, _array(new std::atomic<T>[this->capacity()]) , _array(new std::atomic<T>[this->capacity()])
{} {}
~ring_buffer() ~ring_buffer()
{ {
delete [] _array; delete [] _array;
if (_next)
{
delete _next;
}
} }
u64 capacity() noexcept u64 capacity() noexcept
...@@ -57,18 +63,27 @@ namespace typon::fdt::lock_free ...@@ -57,18 +63,27 @@ namespace typon::fdt::lock_free
ring_buffer * grow(u64 start, u64 end) noexcept ring_buffer * grow(u64 start, u64 end) noexcept
{ {
auto buffer = new ring_buffer(std::countr_one(_mask) + 1); auto buffer = new ring_buffer(std::countr_one(_mask) + 1, this);
return fill(buffer, start, end); return fill(buffer, start, end);
} }
ring_buffer * shrink(u64 start, u64 end) noexcept ring_buffer * shrink(u64 start, u64 end) noexcept
{ {
u8 bits = std::countr_one(_mask); ring_buffer * last = nullptr;
if (bits < 2) auto next = this;
auto size = (end - start);
auto threshold = size * 2;
while (next->_next && next->_next->capacity() >= threshold)
{
last = next;
next = next->_next;
}
if (!last)
{ {
return nullptr; return nullptr;
} }
return fill(new ring_buffer(bits - 1), start, end); return fill(std::exchange(last->_next, nullptr), start, end);
} }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment