Commit dfe1a8d7 authored by Kevin Modzelewski's avatar Kevin Modzelewski

Merge branch 'gen_fixes'

parents d50ce553 7c078f1e
...@@ -388,11 +388,11 @@ cpplint: ...@@ -388,11 +388,11 @@ cpplint:
check: check:
@# These are ordered roughly in decreasing order of (chance will expose issue) / (time to run test) @# These are ordered roughly in decreasing order of (chance will expose issue) / (time to run test)
$(MAKE) lint $(MAKE) lint
$(MAKE) check_format
$(MAKE) ext_python ext_pyston pyston_dbg $(MAKE) ext_python ext_pyston pyston_dbg
$(MAKE) check_dbg $(MAKE) check_dbg
$(MAKE) check_format
$(MAKE) run_unittests $(MAKE) run_unittests
@# jit_prof forces the use of GCC as the compiler, which can expose other errors, so just build it and see what happens: @# jit_prof forces the use of GCC as the compiler, which can expose other errors, so just build it and see what happens:
......
...@@ -281,6 +281,8 @@ public: ...@@ -281,6 +281,8 @@ public:
} }
Value ASTInterpreter::execute(ASTInterpreter& interpreter, AST_stmt* start_at) { Value ASTInterpreter::execute(ASTInterpreter& interpreter, AST_stmt* start_at) {
threading::allowGLReadPreemption();
assert(start_at == NULL); assert(start_at == NULL);
void* frame_addr = __builtin_frame_address(0); void* frame_addr = __builtin_frame_address(0);
...@@ -409,7 +411,11 @@ Value ASTInterpreter::visit_branch(AST_Branch* node) { ...@@ -409,7 +411,11 @@ Value ASTInterpreter::visit_branch(AST_Branch* node) {
} }
Value ASTInterpreter::visit_jump(AST_Jump* node) { Value ASTInterpreter::visit_jump(AST_Jump* node) {
if (ENABLE_OSR && node->target->idx < current_block->idx && compiled_func) { bool backedge = node->target->idx < current_block->idx && compiled_func;
if (backedge)
threading::allowGLReadPreemption();
if (ENABLE_OSR && backedge) {
++edgecount; ++edgecount;
if (edgecount > OSR_THRESHOLD) { if (edgecount > OSR_THRESHOLD) {
eraseDeadSymbols(); eraseDeadSymbols();
......
...@@ -421,10 +421,14 @@ extern "C" void endAllowThreads() { ...@@ -421,10 +421,14 @@ extern "C" void endAllowThreads() {
static pthread_mutex_t gil = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t gil = PTHREAD_MUTEX_INITIALIZER;
static std::atomic<int> threads_waiting_on_gil(0); static std::atomic<int> threads_waiting_on_gil(0);
static pthread_cond_t gil_acquired = PTHREAD_COND_INITIALIZER;
void acquireGLWrite() { void acquireGLWrite() {
threads_waiting_on_gil++; threads_waiting_on_gil++;
pthread_mutex_lock(&gil); pthread_mutex_lock(&gil);
threads_waiting_on_gil--; threads_waiting_on_gil--;
pthread_cond_signal(&gil_acquired);
} }
void releaseGLWrite() { void releaseGLWrite() {
...@@ -435,17 +439,28 @@ void releaseGLWrite() { ...@@ -435,17 +439,28 @@ void releaseGLWrite() {
// Note: this doesn't need to be an atomic, since it should // Note: this doesn't need to be an atomic, since it should
// only be accessed by the thread that holds the gil: // only be accessed by the thread that holds the gil:
int gil_check_count = 0; int gil_check_count = 0;
// TODO: this function is fair in that it forces a thread to give up the GIL
// after a bounded amount of time, but currently we have no guarantees about
// who it will release the GIL to. So we could have two threads that are
// switching back and forth, and a third that never gets run.
// We could enforce fairness by having a FIFO of events (implementd with mutexes?)
// and make sure to always wake up the longest-waiting one.
void allowGLReadPreemption() { void allowGLReadPreemption() {
// Can read this variable with relaxed consistency; not a huge problem if // Double-checked locking: first read with no ordering constraint:
// we accidentally read a stale value for a little while.
if (!threads_waiting_on_gil.load(std::memory_order_relaxed)) if (!threads_waiting_on_gil.load(std::memory_order_relaxed))
return; return;
gil_check_count++; gil_check_count++;
if (gil_check_count >= GIL_CHECK_INTERVAL) { if (gil_check_count >= GIL_CHECK_INTERVAL) {
gil_check_count = 0; gil_check_count = 0;
releaseGLRead();
acquireGLRead(); // Double check this, since if we are wrong about there being a thread waiting on the gil,
// we're going to get stuck in the following pthread_cond_wait:
if (!threads_waiting_on_gil.load(std::memory_order_seq_cst))
return;
pthread_cond_wait(&gil_acquired, &gil);
} }
} }
#elif THREADING_USE_GRWL #elif THREADING_USE_GRWL
......
...@@ -61,12 +61,17 @@ Box* generatorSend(Box* s, Box* v) { ...@@ -61,12 +61,17 @@ Box* generatorSend(Box* s, Box* v) {
assert(s->cls == generator_cls); assert(s->cls == generator_cls);
BoxedGenerator* self = static_cast<BoxedGenerator*>(s); BoxedGenerator* self = static_cast<BoxedGenerator*>(s);
if (self->running)
raiseExcHelper(ValueError, "generator already executing");
// check if the generator already exited // check if the generator already exited
if (self->entryExited) if (self->entryExited)
raiseExcHelper(StopIteration, ""); raiseExcHelper(StopIteration, "");
self->returnValue = v; self->returnValue = v;
self->running = true;
swapcontext(&self->returnContext, &self->context); swapcontext(&self->returnContext, &self->context);
self->running = false;
// propagate exception to the caller // propagate exception to the caller
if (self->exception) if (self->exception)
...@@ -130,7 +135,7 @@ extern "C" BoxedGenerator* createGenerator(BoxedFunction* function, Box* arg1, B ...@@ -130,7 +135,7 @@ extern "C" BoxedGenerator* createGenerator(BoxedFunction* function, Box* arg1, B
extern "C" BoxedGenerator::BoxedGenerator(BoxedFunction* function, Box* arg1, Box* arg2, Box* arg3, Box** args) extern "C" BoxedGenerator::BoxedGenerator(BoxedFunction* function, Box* arg1, Box* arg2, Box* arg3, Box** args)
: Box(generator_cls), function(function), arg1(arg1), arg2(arg2), arg3(arg3), args(nullptr), entryExited(false), : Box(generator_cls), function(function), arg1(arg1), arg2(arg2), arg3(arg3), args(nullptr), entryExited(false),
returnValue(nullptr), exception(nullptr) { running(false), returnValue(nullptr), exception(nullptr) {
giveAttr("__name__", boxString(function->f->source->getName())); giveAttr("__name__", boxString(function->f->source->getName()));
......
...@@ -524,6 +524,7 @@ public: ...@@ -524,6 +524,7 @@ public:
GCdArray* args; GCdArray* args;
bool entryExited; bool entryExited;
bool running;
Box* returnValue; Box* returnValue;
Box* exception; Box* exception;
......
# From PEP 255: "A generator cannot be resumed while it is actively running"
def g():
i = me.next()
yield i
me = g()
try:
me.next()
except ValueError, e:
# Should be: "generator already executing"
print e
# Start a generator on one thread, pass it to another, and have that execute it for a while
def gen(): def gen():
while True: while True:
for i in xrange(100): for i in xrange(100):
......
# Pass a started generator to two different threads, and make them both
# try to run it at the same time.
started = []
def gen():
started.append(None)
while len(started) == 1:
pass
yield "done"
done = []
def run_through(g, i):
if i == 0:
print g.next()
else:
while len(started) < 1:
pass
try:
print g.next()
except ValueError, e:
print e
started.append(None)
done.append(None)
g = gen()
from thread import start_new_thread
start_new_thread(run_through, (g, 0))
start_new_thread(run_through, (g, 1))
import time
while len(done) < 2:
time.sleep(0.01)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment