Commit 5d88948f authored by Xavier Thompson's avatar Xavier Thompson

Improve scheduler work adaptation

parent 3e1af24c
...@@ -23,6 +23,7 @@ namespace typon ...@@ -23,6 +23,7 @@ namespace typon
struct Scheduler struct Scheduler
{ {
using uint = unsigned int; using uint = unsigned int;
using u64 = std::uint_fast64_t;
using Work = Worker::Work; using Work = Worker::Work;
using garbage_collector = fdt::lock_free::garbage_collector; using garbage_collector = fdt::lock_free::garbage_collector;
...@@ -38,6 +39,7 @@ namespace typon ...@@ -38,6 +39,7 @@ namespace typon
{ {
uint id = fdt::random::random() % get()._concurrency; uint id = fdt::random::random() % get()._concurrency;
get()._worker[id].add(new Deque(task)); get()._worker[id].add(new Deque(task));
get()._stealables.fetch_add(1);
get()._notifyer.notify_one(); get()._notifyer.notify_one();
} }
...@@ -67,12 +69,13 @@ namespace typon ...@@ -67,12 +69,13 @@ namespace typon
{ {
uint id = fdt::random::random() % get()._concurrency; uint id = fdt::random::random() % get()._concurrency;
get()._worker[id].add(deque); get()._worker[id].add(deque);
get()._stealables.fetch_add(1);
} }
get()._notifyer.notify_one(); get()._notifyer.notify_one();
} }
std::atomic<uint> _actives = 0;
std::atomic<uint> _thieves = 0; std::atomic<uint> _thieves = 0;
std::atomic<u64> _stealables = 0;
std::vector<Worker> _worker; std::vector<Worker> _worker;
std::vector<std::thread> _thread; std::vector<std::thread> _thread;
std::atomic_bool _done {false}; std::atomic_bool _done {false};
...@@ -87,7 +90,8 @@ namespace typon ...@@ -87,7 +90,8 @@ namespace typon
{ {
for (uint i = 0; i < concurrency; i++) for (uint i = 0; i < concurrency; i++)
{ {
_worker[i]._gc = &_gc; _worker[i]._gc = &(_gc);
_worker[i]._stealables = &(_stealables);
} }
thread_id = concurrency; thread_id = concurrency;
...@@ -122,15 +126,7 @@ namespace typon ...@@ -122,15 +126,7 @@ namespace typon
void exploit_work(Work & work) noexcept void exploit_work(Work & work) noexcept
{ {
if (_actives.fetch_add(1) == 0)
{
if (_thieves.load() == 0)
{
_notifyer.notify_one();
}
}
_worker[thread_id].resume(work); _worker[thread_id].resume(work);
_actives.fetch_sub(1);
} }
void explore_work(Work & work) noexcept void explore_work(Work & work) noexcept
...@@ -147,19 +143,6 @@ namespace typon ...@@ -147,19 +143,6 @@ namespace typon
} }
} }
void detect_work(Work & work) noexcept
{
auto epoch = _gc.epoch(thread_id);
for (uint id = 0; id < _concurrency; id++)
{
work = _worker[id].steal();
if (work)
{
break;
}
}
}
bool wait_for_work(Work & work) noexcept bool wait_for_work(Work & work) noexcept
{ {
work = {}; work = {};
...@@ -176,16 +159,6 @@ namespace typon ...@@ -176,16 +159,6 @@ namespace typon
return true; return true;
} }
auto key = _notifyer.prepare_wait(); auto key = _notifyer.prepare_wait();
detect_work(work);
if (work)
{
_notifyer.cancel_wait();
if (_thieves.fetch_sub(1) == 1)
{
_notifyer.notify_one();
}
return true;
}
if (_done.load()) if (_done.load())
{ {
_notifyer.cancel_wait(); _notifyer.cancel_wait();
...@@ -195,14 +168,12 @@ namespace typon ...@@ -195,14 +168,12 @@ namespace typon
} }
if (_thieves.fetch_sub(1) == 1) if (_thieves.fetch_sub(1) == 1)
{ {
if (_actives.load() > 0) if (_stealables.load() > 0)
{ {
_notifyer.cancel_wait(); _notifyer.cancel_wait();
continue; continue;
} }
} }
// _notifyer.cancel_wait();
// (void) key;
_notifyer.wait(key); _notifyer.wait(key);
} }
} }
......
...@@ -47,6 +47,7 @@ namespace typon ...@@ -47,6 +47,7 @@ namespace typon
std::mutex _mutex; std::mutex _mutex;
std::atomic<Deque *> _deque {nullptr}; std::atomic<Deque *> _deque {nullptr};
std::vector<Deque *> _pool; std::vector<Deque *> _pool;
std::atomic_uint_fast64_t * _stealables;
fdt::lock_free::garbage_collector * _gc; fdt::lock_free::garbage_collector * _gc;
~Worker() ~Worker()
...@@ -88,24 +89,30 @@ namespace typon ...@@ -88,24 +89,30 @@ namespace typon
void resume(Work & work) noexcept void resume(Work & work) noexcept
{ {
auto deque = _deque.load();
if (work._state == Work::Resumable) if (work._state == Work::Resumable)
{ {
auto deque = _deque.load();
_deque.store(work._deque); _deque.store(work._deque);
if (deque) if (deque)
{ {
_gc->retire(deque); _gc->retire(deque);
} }
_stealables->fetch_add(1);
work._deque->resume(); work._deque->resume();
} }
else else
{ {
if (!deque) if (!_deque.load())
{ {
_deque.store(new Deque()); _deque.store(new Deque());
} }
_stealables->fetch_add(1);
work._task.resume(); work._task.resume();
} }
if (_deque.load())
{
_stealables->fetch_sub(1);
}
} }
void push(Continuation task) noexcept void push(Continuation task) noexcept
...@@ -177,6 +184,7 @@ namespace typon ...@@ -177,6 +184,7 @@ namespace typon
{ {
return deque; return deque;
} }
_stealables->fetch_sub(1);
return {}; return {};
} }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment