Commit 28c651ab authored by Kevin Modzelewski's avatar Kevin Modzelewski

Make GC collections work when threads are in generators

The previous code assumed that there was only one stack per
thread, which could be examined by looking at the current registers.
Generators complicate that, since we switch to a separate stack.
We still need to scan the original, main stack, which means that
we need to add some bookkeeping code that remembers what the main
stack was, even as we swap to and from generators.
parent 7e86fc1a
......@@ -40,31 +40,88 @@ int tgkill(int tgid, int tid, int sig) {
return syscall(SYS_tgkill, tgid, tid, sig);
}
PthreadFastMutex threading_lock;
// Certain thread examination functions won't be valid for a brief
// period while a thread is starting up.
// To handle this, track the number of threads in an uninitialized state,
// and wait until they start up.
// As a minor optimization, this is not a std::atomic since it should only
// be checked while the threading_lock is held; might not be worth it.
int num_starting_threads(0);
PthreadFastMutex threading_lock;
struct ThreadInfo {
// "bottom" in the sense of a stack, which in a down-growing stack is the highest address:
class ThreadStateInternal {
private:
bool saved;
ucontext_t ucontext;
ucontext_t* context_from_generator;
int generator_depth;
public:
void* stack_bottom;
pthread_t pthread_id;
};
static std::unordered_map<pid_t, ThreadInfo> current_threads;
struct ThreadStateInternal {
bool valid;
ucontext_t ucontext;
ThreadStateInternal(void* stack_bottom, pthread_t pthread_id)
: saved(false), generator_depth(0), stack_bottom(stack_bottom), pthread_id(pthread_id) {}
void saveCurrent() {
assert(!saved);
if (generator_depth == 0) {
getcontext(&ucontext);
}
saved = true;
}
void popCurrent() {
assert(saved);
saved = false;
}
bool isValid() { return saved || generator_depth; }
ucontext_t* getContext() {
if (generator_depth)
return context_from_generator;
return &ucontext;
}
void pushGenerator(ucontext_t* prev_context) {
if (generator_depth == 0)
context_from_generator = prev_context;
generator_depth++;
}
ThreadStateInternal() : valid(false) {}
void popGenerator() {
generator_depth--;
assert(generator_depth >= 0);
}
void assertNoGenerators() { assert(generator_depth == 0); }
friend void* getStackTop();
};
static std::unordered_map<pid_t, ThreadStateInternal> saved_thread_states;
static std::unordered_map<pid_t, ThreadStateInternal*> current_threads;
// TODO could optimize these by keeping a __thread local reference to current_threads[gettid()]
void* getStackBottom() {
return current_threads[gettid()].stack_bottom;
return current_threads[gettid()]->stack_bottom;
}
void* getStackTop() {
ThreadStateInternal* state = current_threads[gettid()];
int depth = state->generator_depth;
if (depth == 0) {
return __builtin_frame_address(0);
}
return (void*)state->context_from_generator->uc_mcontext.gregs[REG_RSP];
}
void pushGenerator(ucontext_t* prev_context) {
current_threads[gettid()]->pushGenerator(prev_context);
}
void popGenerator() {
current_threads[gettid()]->popGenerator();
}
static int signals_waiting(0);
......@@ -73,9 +130,9 @@ static std::vector<ThreadState> thread_states;
static void pushThreadState(pid_t tid, ucontext_t* context) {
#if STACK_GROWS_DOWN
void* stack_start = (void*)context->uc_mcontext.gregs[REG_RSP];
void* stack_end = current_threads[tid].stack_bottom;
void* stack_end = current_threads[tid]->stack_bottom;
#else
void* stack_start = current_threads[tid].stack_bottom;
void* stack_start = current_threads[tid]->stack_bottom;
void* stack_end = (void*)(context->uc_mcontext.gregs[REG_RSP] + sizeof(void*));
#endif
assert(stack_start < stack_end);
......@@ -105,32 +162,36 @@ std::vector<ThreadState> getAllThreadStates() {
// Current strategy:
// Let the other threads decide whether they want to cooperate and save their state before we get here.
// If they did save their state (as indicated by saved_thread_states[tid].valid), then we use that.
// If they did save their state (as indicated by current_threads[tid]->isValid), then we use that.
// Otherwise, we send them a signal and use the signal handler to look at their thread state.
pid_t tgid = getpid();
pid_t mytid = gettid();
for (auto& pair : current_threads) {
pid_t tid = pair.first;
ThreadStateInternal* state = pair.second;
if (tid == mytid)
continue;
// TODO I'm pretty skeptical about this... are we really guaranteed that this is still valid?
// (in the non-generator case where the thread saved its own state)
// ex what if an object pointer got pushed onto the stack, below where we thought the stack
// ended. We might be able to handle that case by examining the entire stack region, but are
// there other issues as well?
if (saved_thread_states[tid].valid) {
pushThreadState(tid, &saved_thread_states[tid].ucontext);
if (state->isValid()) {
pushThreadState(tid, state->getContext());
signals_waiting--;
continue;
}
if (tid == mytid)
continue;
tgkill(tgid, tid, SIGUSR2);
}
// TODO shouldn't busy-wait:
while (signals_waiting) {
threading_lock.unlock();
// printf("Waiting for %d threads\n", signals_waiting);
sleep(0);
threading_lock.lock();
}
......@@ -169,10 +230,10 @@ static void* _thread_start(void* _arg) {
Box* arg3 = arg->arg3;
delete arg;
pid_t tid = gettid();
{
LOCK_REGION(&threading_lock);
pid_t tid = gettid();
pthread_t current_thread = pthread_self();
pthread_attr_t thread_attrs;
......@@ -187,15 +248,12 @@ static void* _thread_start(void* _arg) {
pthread_attr_destroy(&thread_attrs);
current_threads[tid] = ThreadInfo {
#if STACK_GROWS_DOWN
.stack_bottom = static_cast<char*>(stack_start) + stack_size,
void* stack_bottom = static_cast<char*>(stack_start) + stack_size;
#else
.stack_bottom = stack_start,
void* stack_bottom = stack_start;
#endif
.pthread_id = current_thread,
};
saved_thread_states[tid] = ThreadStateInternal();
current_threads[tid] = new ThreadStateInternal(stack_bottom, current_thread);
num_starting_threads--;
......@@ -206,12 +264,12 @@ static void* _thread_start(void* _arg) {
threading::GLReadRegion _glock;
void* rtn = start_func(arg1, arg2, arg3);
current_threads[tid]->assertNoGenerators();
{
LOCK_REGION(&threading_lock);
current_threads.erase(gettid());
saved_thread_states.erase(gettid());
if (VERBOSITY() >= 2)
printf("thread tid=%d exited\n", gettid());
}
......@@ -284,9 +342,7 @@ static void* find_stack() {
void registerMainThread() {
LOCK_REGION(&threading_lock);
current_threads[gettid()] = ThreadInfo{
.stack_bottom = find_stack(), .pthread_id = pthread_self(),
};
current_threads[gettid()] = new ThreadStateInternal(find_stack(), pthread_self());
struct sigaction act;
act.sa_flags = SA_SIGINFO;
......@@ -298,6 +354,12 @@ void registerMainThread() {
err(1, NULL);
}
void finishMainThread() {
current_threads[gettid()]->assertNoGenerators();
// TODO maybe this is the place to wait for non-daemon threads?
}
// For the "AllowThreads" regions, let's save the thread state at the beginning of the region.
// This means that the thread won't get interrupted by the signals we would otherwise need to
......@@ -313,17 +375,15 @@ GLAllowThreadsReadRegion::GLAllowThreadsReadRegion() {
{
LOCK_REGION(&threading_lock);
ThreadStateInternal& state = saved_thread_states[gettid()];
assert(!state.valid);
getcontext(&state.ucontext);
state.valid = true;
ThreadStateInternal* state = current_threads[gettid()];
state->saveCurrent();
}
}
GLAllowThreadsReadRegion::~GLAllowThreadsReadRegion() {
{
LOCK_REGION(&threading_lock);
saved_thread_states[gettid()].valid = false;
current_threads[gettid()]->popCurrent();
}
......
......@@ -32,18 +32,19 @@ namespace threading {
intptr_t start_thread(void* (*start_func)(Box*, Box*, Box*), Box* arg1, Box* arg2, Box* arg3);
void registerMainThread();
void finishMainThread();
struct ThreadState {
pid_t tid; // useful mostly for debugging
ucontext_t ucontext;
ucontext_t* ucontext;
// start and end (start < end) of the threads main stack.
// The thread may not be actually executing on that stack, since it may be
// in a generator, but those generators will be tracked separately.
void* stack_start, *stack_end;
ThreadState(pid_t tid, ucontext_t* ucontext, void* stack_start, void* stack_end)
: tid(tid), stack_start(stack_start), stack_end(stack_end) {
memcpy(&this->ucontext, ucontext, sizeof(ucontext_t));
this->ucontext.uc_mcontext.fpregs = &this->ucontext.__fpregs_mem;
}
: tid(tid), ucontext(ucontext), stack_start(stack_start), stack_end(stack_end) {}
};
// Gets a ThreadState per thread, not including the thread calling this function.
// For this call to make sense, the threads all should be blocked;
......@@ -53,7 +54,16 @@ std::vector<ThreadState> getAllThreadStates();
// Get the stack "bottom" (ie first pushed data. For stacks that grow down, this
// will be the highest address).
void* getStackBottom();
void* getStackTop();
// We need to track the state of the thread's main stack. This can get complicated when
// generators are involved, so we add some hooks for the generator code to notify the threading
// code that it has switched onto of off of a generator.
// A generator should call pushGenerator() when it gets switched to, with a pointer to the context
// that it will return to (ie the context of the thing that called the generator).
// The generator should call popGenerator() when it is about to switch back to the caller.
void pushGenerator(ucontext_t* prev_context);
void popGenerator();
#ifndef THREADING_USE_GIL
......
......@@ -51,7 +51,7 @@ void collectOtherThreadsStacks(TraceStack* stack) {
for (threading::ThreadState& tstate : threads) {
collectRoots(tstate.stack_start, tstate.stack_end, stack);
collectRoots(&tstate.ucontext, (&tstate.ucontext) + 1, stack);
collectRoots(tstate.ucontext, tstate.ucontext + 1, stack);
}
}
......@@ -69,10 +69,11 @@ static void collectLocalStack(TraceStack* stack) {
collectRoots(&registers, (&registers) + 1, stack);
void* stack_bottom = threading::getStackBottom();
void* stack_top = threading::getStackTop();
#if STACK_GROWS_DOWN
collectRoots(&registers, stack_bottom, stack);
collectRoots(stack_top, stack_bottom, stack);
#else
collectRoots(stack_bottom, &registers + 1, stack);
collectRoots(stack_bottom, stack_top, stack);
#endif
}
......
......@@ -270,6 +270,9 @@ int main(int argc, char** argv) {
}
}
}
threading::finishMainThread();
_t.split("joinRuntime");
int rtncode = joinRuntime();
......
......@@ -36,6 +36,7 @@ namespace pyston {
static void generatorEntry(BoxedGenerator* g) {
assert(g->cls == generator_cls);
assert(g->function->cls == function_cls);
threading::pushGenerator(&g->returnContext);
try {
// call body of the generator
......@@ -50,6 +51,7 @@ static void generatorEntry(BoxedGenerator* g) {
// we returned from the body of the generator. next/send/throw will notify the caller
g->entryExited = true;
threading::popGenerator();
swapcontext(&g->context, &g->returnContext);
}
......@@ -107,7 +109,9 @@ extern "C" Box* yield(BoxedGenerator* obj, Box* value) {
BoxedGenerator* self = static_cast<BoxedGenerator*>(obj);
self->returnValue = value;
threading::popGenerator();
swapcontext(&self->context, &self->returnContext);
threading::pushGenerator(&self->returnContext);
// if the generator receives a exception from the caller we have to throw it
if (self->exception) {
......
# expected: fail
# - the GC will crash if any thread is inside a generator during a collection
# Allocate a few-dozen megabytes of memory inside a generator,
# to try to force a collection.
def f():
l = range(1000)
for i in xrange(10000):
l = list(l)
l = range(500)
# Something above the OSR threshold:
for i in xrange(12000):
l = (l * 2)[:500]
yield 0
print list(f())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment