Commit 3e40a14d authored by Xavier Thompson's avatar Xavier Thompson

Improve scheduler implementation

- Add Scheduler.join() method
- Allow for multiple root tasks
- Allow for no task at all
parent 27d45900
...@@ -26,8 +26,8 @@ cdef inline void * worker_function(void * arg) nogil: ...@@ -26,8 +26,8 @@ cdef inline void * worker_function(void * arg) nogil:
while 1: while 1:
# Wait until a queue becomes available. # Wait until a queue becomes available.
sem_wait(&sch.num_free_queues) sem_wait(&sch.num_free_queues)
# If the scheduler is done there is nothing to do anymore. # If the scheduler is finished there is nothing to do anymore.
if sch.is_done: if sch.is_finished:
return <void*> 0 return <void*> 0
# Pop or steal a queue. # Pop or steal a queue.
queue = worker.get_queue() queue = worker.get_queue()
...@@ -40,7 +40,7 @@ cdef inline void * worker_function(void * arg) nogil: ...@@ -40,7 +40,7 @@ cdef inline void * worker_function(void * arg) nogil:
# Decrement the number of non-completed queues. # Decrement the number of non-completed queues.
if sch.num_pending_queues.fetch_sub(1) == 1: if sch.num_pending_queues.fetch_sub(1) == 1:
# Signal that there are no more queues. # Signal that there are no more queues.
sem_post(&sch.done) sem_post(&sch.is_idle)
# Discard the empty queue and continue the main loop. # Discard the empty queue and continue the main loop.
continue continue
# The queue is not empty: reinsert it in this worker's queues. # The queue is not empty: reinsert it in this worker's queues.
...@@ -103,8 +103,8 @@ cdef cypclass Scheduler: ...@@ -103,8 +103,8 @@ cdef cypclass Scheduler:
pthread_barrier_t barrier pthread_barrier_t barrier
sem_t num_free_queues sem_t num_free_queues
atomic[int] num_pending_queues atomic[int] num_pending_queues
sem_t done sem_t is_idle
volatile bint is_done volatile bint is_finished
int num_workers int num_workers
lock Scheduler __new__(alloc, int num_workers=0): lock Scheduler __new__(alloc, int num_workers=0):
...@@ -112,20 +112,17 @@ cdef cypclass Scheduler: ...@@ -112,20 +112,17 @@ cdef cypclass Scheduler:
if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN) if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN)
self.num_workers = num_workers self.num_workers = num_workers
sem_init(&self.num_free_queues, 0, 0) sem_init(&self.num_free_queues, 0, 0)
sem_init(&self.done, 0, 0) # Initially the scheduler is idle but not finished
self.is_finished = False
sem_init(&self.is_idle, 0, 1)
self.num_pending_queues.store(0) self.num_pending_queues.store(0)
if pthread_barrier_init(&self.barrier, NULL, num_workers + 1): if pthread_barrier_init(&self.barrier, NULL, num_workers + 1):
printf("Could not allocate memory for the thread barrier\n") printf("Could not allocate memory for the thread barrier\n")
# Signal that no work will be done.
sem_post(&self.done)
return self return self
self.is_done = False
self.workers.reserve(num_workers) self.workers.reserve(num_workers)
for i in range(num_workers): for i in range(num_workers):
worker = Worker(self) worker = Worker(self)
if worker is NULL: if worker is NULL:
# Signal that no work will be done.
sem_post(&self.done)
return self return self
self.workers.push_back(worker) self.workers.push_back(worker)
# Wait until all the worker threads are ready. # Wait until all the worker threads are ready.
...@@ -135,11 +132,16 @@ cdef cypclass Scheduler: ...@@ -135,11 +132,16 @@ cdef cypclass Scheduler:
__dealloc__(self): __dealloc__(self):
pthread_barrier_destroy(&self.barrier) pthread_barrier_destroy(&self.barrier)
sem_destroy(&self.num_free_queues) sem_destroy(&self.num_free_queues)
sem_destroy(&self.done) sem_destroy(&self.is_idle)
void post_queue(lock self, lock SequentialMailBox queue): void post_queue(lock self, lock SequentialMailBox queue):
cdef int num_workers, random_offset cdef int num_workers, random_offset, num_previous_queues
sch = <Scheduler> <void*> self sch = <Scheduler> <void*> self
# Increment the number of non-completed queues.
num_previous_queues = sch.num_pending_queues.fetch_add(1)
if num_previous_queues == 0:
# Signal that the scheduler is not idle.
sem_wait(&self.is_idle)
# Add a queue to a random worker. # Add a queue to a random worker.
num_workers = <int> sch.workers.size() num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers random_offset = rand() % num_workers
...@@ -147,17 +149,20 @@ cdef cypclass Scheduler: ...@@ -147,17 +149,20 @@ cdef cypclass Scheduler:
with wlocked receiver: with wlocked receiver:
queue.has_worker = True queue.has_worker = True
receiver.queues.push_back(queue) receiver.queues.push_back(queue)
# Increment the number of non-completed queues.
sch.num_pending_queues.fetch_add(1)
# Signal that a queue is available. # Signal that a queue is available.
sem_post(&sch.num_free_queues) sem_post(&sch.num_free_queues)
void join(lock self):
# Wait until the scheduler is idle.
is_idle = &self.is_idle
sem_wait(is_idle)
sem_post(is_idle)
void finish(lock self): void finish(lock self):
# Wait until there is no more work. # Wait until all current tasks are joined
done = &self.done self.join()
sem_wait(done)
# Signal the worker threads that there is no more work. # Signal the worker threads that there is no more work.
self.is_done = True self.is_finished = True
# Pretend that there are new queues to wake up the workers. # Pretend that there are new queues to wake up the workers.
num_free_queues = &self.num_free_queues num_free_queues = &self.num_free_queues
for worker in self.workers: for worker in self.workers:
...@@ -179,6 +184,7 @@ cdef cypclass SequentialMailBox(ActhonQueueInterface): ...@@ -179,6 +184,7 @@ cdef cypclass SequentialMailBox(ActhonQueueInterface):
return self.messages.empty() return self.messages.empty()
void push(locked self, ActhonMessageInterface message): void push(locked self, ActhonMessageInterface message):
cdef bint has_worker
# Add a task to the queue. # Add a task to the queue.
self.messages.push_back(message) self.messages.push_back(message)
if message._sync_method is not NULL: if message._sync_method is not NULL:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment