Commit 5c44f0f7 authored by Ophélie Gagnard's avatar Ophélie Gagnard

Update the runtime. It fixes a bug (accoring to the Cython+ developers).

parent 059a839b
# Cython+ Runtime
A minimal yet efficient runtime for Cython+.
This code was written by Xavier Thompson <xavier.thompson@nexedi.com>
## Using the runtime
In your Cython+ projects, you can add this repository as a [git submodule](https://git-scm.com/book/en/v2/Git-Tools-Submodules)
at the root of your project:
```sh
git submodule add https://lab.nexedi.com/cython-plus/runtime.git
git commit -m "Register the runtime as a git submodule"
```
If you were previously vendoring the runtime, you need to remove the associated directory from the repository cache first:
```sh
git rm --cached -r runtime/
git submodule add https://lab.nexedi.com/cython-plus/runtime.git
git commit -m "Unvendor the runtime and instead register it as a git submodule"
```
This way, you can version your code with a specific version of the runtime and update it when needed.
...@@ -21,7 +21,6 @@ cdef cypclass Worker ...@@ -21,7 +21,6 @@ cdef cypclass Worker
cdef inline void * worker_function(void * arg) nogil: cdef inline void * worker_function(void * arg) nogil:
worker = <lock Worker> arg worker = <lock Worker> arg
sch = <Scheduler> <void*> worker.scheduler sch = <Scheduler> <void*> worker.scheduler
cdef int num_remaining_queues
# Wait until all the workers are ready. # Wait until all the workers are ready.
pthread_barrier_wait(&sch.barrier) pthread_barrier_wait(&sch.barrier)
while 1: while 1:
...@@ -45,6 +44,7 @@ cdef inline void * worker_function(void * arg) nogil: ...@@ -45,6 +44,7 @@ cdef inline void * worker_function(void * arg) nogil:
# Discard the empty queue and continue the main loop. # Discard the empty queue and continue the main loop.
continue continue
# The queue is not empty: reinsert it in this worker's queues. # The queue is not empty: reinsert it in this worker's queues.
with wlocked worker:
worker.queues.push_back(queue) worker.queues.push_back(queue)
# Signal that the queue is available. # Signal that the queue is available.
sem_post(&sch.num_free_queues) sem_post(&sch.num_free_queues)
...@@ -75,24 +75,23 @@ cdef cypclass Worker: ...@@ -75,24 +75,23 @@ cdef cypclass Worker:
lock SequentialMailBox steal_queue(lock self): lock SequentialMailBox steal_queue(lock self):
# Steal a queue from another worker: # Steal a queue from another worker:
# - inspect each worker in order starting at a random offset # - inspect each worker in order starting at a random offset
# - skip this worker and any worker with an empty queue list # - skip any worker with an empty queue list
# - return the last queue of the first worker with a non-empty list # - return the last queue of the first worker with a non-empty list
# - continue looping until a queue is found
cdef int i, index, num_workers, random_offset cdef int i, index, num_workers, random_offset
sch = <Scheduler> <void*> self.scheduler sch = <Scheduler> <void*> self.scheduler
num_workers = <int> sch.workers.size() num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers index = rand() % num_workers
for i in range(num_workers): while True:
index = (i + random_offset) % num_workers
victim = sch.workers[index] victim = sch.workers[index]
if victim is self:
continue
with wlocked victim: with wlocked victim:
if not victim.queues.empty(): if not victim.queues.empty():
stolen_queue = victim.queues.back() stolen_queue = victim.queues.back()
victim.queues.pop_back() victim.queues.pop_back()
stolen_queue.has_worker = True
return stolen_queue return stolen_queue
return NULL index += 1
if index >= num_workers:
index = 0
int join(self): int join(self):
# Join the worker thread. # Join the worker thread.
...@@ -106,10 +105,12 @@ cdef cypclass Scheduler: ...@@ -106,10 +105,12 @@ cdef cypclass Scheduler:
atomic[int] num_pending_queues atomic[int] num_pending_queues
sem_t done sem_t done
volatile bint is_done volatile bint is_done
int num_workers
lock Scheduler __new__(alloc, int num_workers=0): lock Scheduler __new__(alloc, int num_workers=0):
self = <lock Scheduler> consume alloc() self = <lock Scheduler> consume alloc()
if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN) if num_workers == 0: num_workers = sysconf(_SC_NPROCESSORS_ONLN)
self.num_workers = num_workers
sem_init(&self.num_free_queues, 0, 0) sem_init(&self.num_free_queues, 0, 0)
sem_init(&self.done, 0, 0) sem_init(&self.done, 0, 0)
self.num_pending_queues.store(0) self.num_pending_queues.store(0)
...@@ -136,16 +137,20 @@ cdef cypclass Scheduler: ...@@ -136,16 +137,20 @@ cdef cypclass Scheduler:
sem_destroy(&self.num_free_queues) sem_destroy(&self.num_free_queues)
sem_destroy(&self.done) sem_destroy(&self.done)
void post_queue(self, lock SequentialMailBox queue): void post_queue(lock self, lock SequentialMailBox queue):
# Add a queue to the first worker. cdef int num_workers, random_offset
main_worker = self.workers[0] sch = <Scheduler> <void*> self
with wlocked main_worker: # Add a queue to a random worker.
num_workers = <int> sch.workers.size()
random_offset = rand() % num_workers
receiver = sch.workers[random_offset]
with wlocked receiver:
queue.has_worker = True queue.has_worker = True
main_worker.queues.push_back(queue) receiver.queues.push_back(queue)
# Increment the number of non-completed queues. # Increment the number of non-completed queues.
self.num_pending_queues.fetch_add(1) sch.num_pending_queues.fetch_add(1)
# Signal that a queue is available. # Signal that a queue is available.
sem_post(&self.num_free_queues) sem_post(&sch.num_free_queues)
void finish(lock self): void finish(lock self):
# Wait until there is no more work. # Wait until there is no more work.
...@@ -217,3 +222,47 @@ cdef cypclass BatchMailBox(SequentialMailBox): ...@@ -217,3 +222,47 @@ cdef cypclass BatchMailBox(SequentialMailBox):
cdef inline ActhonResultInterface NullResult() nogil: cdef inline ActhonResultInterface NullResult() nogil:
return NULL return NULL
# Taken from:
# https://lab.nexedi.com/nexedi/cython/blob/3.0a6-cypclass/tests/run/cypclass_acthon.pyx#L66
cdef cypclass WaitResult(ActhonResultInterface):
union result_t:
int int_val
void* ptr
result_t result
sem_t semaphore
__init__(self):
self.result.ptr = NULL
sem_init(&self.semaphore, 0, 0)
__dealloc__(self):
sem_destroy(&self.semaphore)
@staticmethod
ActhonResultInterface construct():
return WaitResult()
void pushVoidStarResult(self, void* result):
self.result.ptr = result
sem_post(&self.semaphore)
void pushIntResult(self, int result):
self.result.int_val = result
sem_post(&self.semaphore)
result_t _getRawResult(const self):
# We must ensure a result exists, but we can let others access it immediately
# The cast here is a way of const-casting (we're modifying the semaphore in a const method)
sem_wait(<sem_t*> &self.semaphore)
sem_post(<sem_t*> &self.semaphore)
return self.result
void* getVoidStarResult(const self):
res = self._getRawResult()
return res.ptr
int getIntResult(const self):
res = self._getRawResult()
return res.int_val
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment