Commit b56fa50e authored by Xavier Thompson's avatar Xavier Thompson

WIP Adapt and experiment in scheduler.hpp

parent 7b88cc0c
......@@ -55,7 +55,7 @@ namespace typon
Scheduler & scheduler = get();
WorkDeque * active = scheduler._worker[thread_id]._active.load();
Task task = active->pop();
if (task.match(fdt::lock_free::deque<Continuation>::Prune))
if (task.get_flags() == fdt::lock_free::deque<Continuation>::Resize)
{
if (auto array = active->shrink())
{
......@@ -87,6 +87,7 @@ namespace typon
static void resume(WorkDeque * deque) noexcept
{
deque->_resumable.store(true);
get()._notifyer.notify_one();
}
std::atomic<uint> _actives = 0;
......@@ -161,10 +162,24 @@ namespace typon
_gc.leave(thread_id);
}
void detect_work(Work & work) noexcept
{
_gc.enter(thread_id);
for (uint id = 0; id < _concurrency; id++)
{
work = _worker[id].try_steal();
if (work)
{
break;
}
}
_gc.leave(thread_id);
}
bool wait_for_work(Work & work) noexcept
{
work = {};
while (true)
for(;;)
{
_thieves.fetch_add(1);
explore_work(work);
......@@ -177,6 +192,16 @@ namespace typon
return true;
}
auto key = _notifyer.prepare_wait();
detect_work(work);
if (work)
{
_notifyer.cancel_wait();
if (_thieves.fetch_sub(1) == 1)
{
_notifyer.notify_one();
}
return true;
}
if (_done.load())
{
_notifyer.cancel_wait();
......@@ -192,9 +217,9 @@ namespace typon
continue;
}
}
_notifyer.cancel_wait();
(void) key;
//_notifyer.wait(key);
// _notifyer.cancel_wait();
// (void) key;
_notifyer.wait(key);
}
}
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment