Commit 273bf3d8 authored by Kevin Modzelewski's avatar Kevin Modzelewski

Add some PerThreadSet features

- some extra debugging for the threading_local issue
- remove the data for threads that don't come along with a fork()
parent ded39f2a
...@@ -144,7 +144,28 @@ template <int N, int... S> struct gens : gens<N - 1, N - 1, S...> {}; ...@@ -144,7 +144,28 @@ template <int N, int... S> struct gens : gens<N - 1, N - 1, S...> {};
template <int... S> struct gens<0, S...> { typedef seq<S...> type; }; template <int... S> struct gens<0, S...> { typedef seq<S...> type; };
} }
template <typename T, typename... CtorArgs> class PerThreadSet { class PerThreadSetBase {
private:
static std::unordered_set<PerThreadSetBase*> all_instances;
protected:
PerThreadSetBase() { all_instances.insert(this); }
virtual ~PerThreadSetBase() {
assert(all_instances.count(this));
all_instances.erase(this);
}
virtual void onFork() = 0;
public:
static void runAllForkHandlers() {
for (auto inst : all_instances)
inst->onFork();
}
};
template <typename T, typename... CtorArgs> class PerThreadSet : public PerThreadSetBase {
private: private:
pthread_key_t pthread_key; pthread_key_t pthread_key;
PthreadFastMutex lock; PthreadFastMutex lock;
...@@ -159,20 +180,26 @@ private: ...@@ -159,20 +180,26 @@ private:
std::unordered_map<pthread_t, Storage*> map; std::unordered_map<pthread_t, Storage*> map;
std::tuple<CtorArgs...> ctor_args; std::tuple<CtorArgs...> ctor_args;
#ifndef NDEBUG
int map_elts = 0;
#endif
static void dtor(void* val) { static void dtor(void* val) {
Storage* s = static_cast<Storage*>(val); Storage* s = static_cast<Storage*>(val);
assert(s); assert(s);
auto* self = s->self; auto* self = s->self;
LOCK_REGION(&self->lock); LOCK_REGION(&self->lock);
ASSERT(self->map.size() == self->map_elts, "%ld %d", self->map.size(), self->map_elts);
assert(s->my_tid == pthread_self()); assert(s->my_tid == pthread_self());
// I assume this destructor gets called on the same thread
// that this data is bound to:
assert(self->map.count(pthread_self())); assert(self->map.count(pthread_self()));
self->map.erase(pthread_self()); self->map.erase(pthread_self());
#ifndef NDEBUG
self->map_elts--;
#endif
delete s; delete s;
} }
...@@ -185,6 +212,21 @@ private: ...@@ -185,6 +212,21 @@ private:
.val = T(std::get<S>(ctor_args)...) }; .val = T(std::get<S>(ctor_args)...) };
} }
protected:
void onFork() override {
pthread_t surviving_ptid = pthread_self();
for (auto it = this->map.begin(), end = this->map.end(); it != end;) {
if (it->first != surviving_ptid) {
delete it->second;
it = this->map.erase(it);
#ifndef NDEBUG
this->map_elts--;
#endif
} else
++it;
}
}
public: public:
PerThreadSet(CtorArgs... ctor_args) : ctor_args(std::forward<CtorArgs>(ctor_args)...) { PerThreadSet(CtorArgs... ctor_args) : ctor_args(std::forward<CtorArgs>(ctor_args)...) {
int code = pthread_key_create(&pthread_key, &dtor); int code = pthread_key_create(&pthread_key, &dtor);
...@@ -214,6 +256,12 @@ public: ...@@ -214,6 +256,12 @@ public:
s = make(typename impl::gens<sizeof...(CtorArgs)>::type()); s = make(typename impl::gens<sizeof...(CtorArgs)>::type());
LOCK_REGION(&lock); LOCK_REGION(&lock);
#ifndef NDEBUG
assert(map.size() == map_elts);
map_elts++;
#endif
int code = pthread_setspecific(pthread_key, s); int code = pthread_setspecific(pthread_key, s);
assert(code == 0); assert(code == 0);
......
...@@ -35,6 +35,8 @@ ...@@ -35,6 +35,8 @@
namespace pyston { namespace pyston {
namespace threading { namespace threading {
std::unordered_set<PerThreadSetBase*> PerThreadSetBase::all_instances;
extern "C" { extern "C" {
__thread PyThreadState cur_thread_state __thread PyThreadState cur_thread_state
= { 0, NULL, NULL, NULL, NULL }; // not sure if we need to explicitly request zero-initialization = { 0, NULL, NULL, NULL, NULL }; // not sure if we need to explicitly request zero-initialization
...@@ -508,7 +510,7 @@ extern "C" void PyEval_ReInitThreads() noexcept { ...@@ -508,7 +510,7 @@ extern "C" void PyEval_ReInitThreads() noexcept {
num_starting_threads = 0; num_starting_threads = 0;
threads_waiting_on_gil = 0; threads_waiting_on_gil = 0;
// TODO we should clean up all created PerThreadSets, such as the one used in the heap for thread-local-caches. PerThreadSetBase::runAllForkHandlers();
} }
void acquireGLWrite() { void acquireGLWrite() {
......
...@@ -113,10 +113,10 @@ private: ...@@ -113,10 +113,10 @@ private:
public: public:
TraceStack(TraceStackType type) : visit_type(type) { get_chunk(); } TraceStack(TraceStackType type) : visit_type(type) { get_chunk(); }
TraceStack(TraceStackType type, const std::unordered_set<void*>& root_handles) : visit_type(type) { TraceStack(TraceStackType type, const std::unordered_set<void*>& roots) : visit_type(type) {
get_chunk(); get_chunk();
for (void* p : root_handles) { for (void* p : roots) {
assert(!isMarked(GCAllocation::fromUserData(p))); ASSERT(!isMarked(GCAllocation::fromUserData(p)), "");
push(p); push(p);
} }
} }
......
...@@ -411,7 +411,37 @@ GCAllocation* SmallArena::allocationFrom(void* ptr) { ...@@ -411,7 +411,37 @@ GCAllocation* SmallArena::allocationFrom(void* ptr) {
return reinterpret_cast<GCAllocation*>(&b->atoms[atom_idx]); return reinterpret_cast<GCAllocation*>(&b->atoms[atom_idx]);
} }
#ifndef NDEBUG
void SmallArena::assertConsistent() {
std::unordered_set<Block*> seen_blocks;
auto scan = [&seen_blocks](Block* h) {
while (h) {
ASSERT(h >= (void*)SMALL_ARENA_START && h < (void*)LARGE_ARENA_START, "%p", h);
assert(!seen_blocks.count(h));
seen_blocks.insert(h);
if (h->next)
assert(h->next->prev == &h->next);
h = h->next;
}
};
thread_caches.forEachValue([&scan](ThreadBlockCache* cache) {
for (int bidx = 0; bidx < NUM_BUCKETS; bidx++) {
scan(cache->cache_free_heads[bidx]);
scan(cache->cache_full_heads[bidx]);
}
});
for (int bidx = 0; bidx < NUM_BUCKETS; bidx++) {
scan(full_heads[bidx]);
scan(heads[bidx]);
}
}
#endif
void SmallArena::freeUnmarked(std::vector<Box*>& weakly_referenced) { void SmallArena::freeUnmarked(std::vector<Box*>& weakly_referenced) {
assertConsistent();
thread_caches.forEachValue([this, &weakly_referenced](ThreadBlockCache* cache) { thread_caches.forEachValue([this, &weakly_referenced](ThreadBlockCache* cache) {
for (int bidx = 0; bidx < NUM_BUCKETS; bidx++) { for (int bidx = 0; bidx < NUM_BUCKETS; bidx++) {
Block* h = cache->cache_free_heads[bidx]; Block* h = cache->cache_free_heads[bidx];
......
...@@ -274,6 +274,12 @@ public: ...@@ -274,6 +274,12 @@ public:
void prepareForCollection() {} void prepareForCollection() {}
void cleanupAfterCollection() {} void cleanupAfterCollection() {}
#ifndef NDEBUG
void assertConsistent();
#else
void assertConsistent() {}
#endif
private: private:
template <int N> class Bitmap { template <int N> class Bitmap {
static_assert(N % 64 == 0, ""); static_assert(N % 64 == 0, "");
......
...@@ -17,6 +17,7 @@ if __name__ == '__main__': ...@@ -17,6 +17,7 @@ if __name__ == '__main__':
p = multiprocessing.Process(target=f, args=('bob',)) p = multiprocessing.Process(target=f, args=('bob',))
p.start() p.start()
p.join() p.join()
print p.exitcode
def f(q): def f(q):
...@@ -28,3 +29,5 @@ if __name__ == '__main__': ...@@ -28,3 +29,5 @@ if __name__ == '__main__':
p.start() p.start()
print q.get() # prints "[42, None, 'hello']" print q.get() # prints "[42, None, 'hello']"
p.join() p.join()
print p.exitcode
print "done"
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment