Commit dcf4ebd1 authored by Kirill Smelkov's avatar Kirill Smelkov

libgolang: Fix chan.close to dequeue subscribers atomically

Currently chan.close iterates all send/recv subscribers
unlocking/relocking the channel for each and notifying dequeued
subscriber with channel unlocked. This leads to that even if channel had
only one subscriber, chan.close accesses chan._mu again - after
notifying that subscriber. That in turn means that an idiom where e.g.
a done channel is passed to worker, which worker closes at the end, and
main task waiting on the done and destroying done right after wakeup
cannot work - because close, internally, accesses already destroyed
channel as the following TSAN report shows for _test_go_c:

    WARNING: ThreadSanitizer: data race (pid=7143)
      Write of size 8 at 0x7b1400000650 by main thread:
        #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:649 (libtsan.so.0+0x2b46a)
        #1 free ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:643 (libtsan.so.0+0x2b46a)
        #2 golang::_chan::decref() golang/runtime/libgolang.cpp:470 (liblibgolang.so.0.1+0x47f2)
        #3 _chanxdecref golang/runtime/libgolang.cpp:452 (liblibgolang.so.0.1+0x484a)
        #4 _test_go_c golang/runtime/libgolang_test_c.c:86 (_golang_test.so+0x13a2e)
        #5 __pyx_pf_6golang_12_golang_test_12test_go_c golang/_golang_test.cpp:3340 (_golang_test.so+0xcbaa)
        #6 __pyx_pw_6golang_12_golang_test_13test_go_c golang/_golang_test.cpp:3305 (_golang_test.so+0xcbaa)
        #7 PyEval_EvalFrameEx <null> (python2.7+0xf68b4)

      Previous read of size 8 at 0x7b1400000650 by thread T8:
        #0 golang::Sema::acquire() golang/runtime/libgolang.cpp:164 (liblibgolang.so.0.1+0x410a)
        #1 golang::Mutex::lock() golang/runtime/libgolang.cpp:175 (liblibgolang.so.0.1+0x4c82)
        #2 golang::_chan::close() golang/runtime/libgolang.cpp:754 (liblibgolang.so.0.1+0x4c82)
        #3 _chanclose golang/runtime/libgolang.cpp:732 (liblibgolang.so.0.1+0x4d1a)
        #4 _work golang/runtime/libgolang_test_c.c:92 (_golang_test.so+0x136cc)
        #5 <null> <null> (python2.7+0x1929e3)

      Thread T8 (tid=7311, finished) created by main thread at:
        #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:915 (libtsan.so.0+0x2be1b)
        #1 PyThread_start_new_thread <null> (python2.7+0x19299f)
        #2 _taskgo golang/runtime/libgolang.cpp:119 (liblibgolang.so.0.1+0x3f68)
        #3 _test_go_c golang/runtime/libgolang_test_c.c:84 (_golang_test.so+0x13a1c)
        #4 __pyx_pf_6golang_12_golang_test_12test_go_c golang/_golang_test.cpp:3340 (_golang_test.so+0xcbaa)
        #5 __pyx_pw_6golang_12_golang_test_13test_go_c golang/_golang_test.cpp:3305 (_golang_test.so+0xcbaa)
        #6 PyEval_EvalFrameEx <null> (python2.7+0xf68b4)

-> Fix close to dequeue all channel's subscribers atomically, and notify
them all after channel is unlocked and _no_ longer accessed.

Close was already working this way when channels were done at Python
level, but in 3b241983 (Port/move channels to C/C++/Pyx) I introduced
this bug while trying to avoid additional memory allocation in close.

Added test catches the bug on all - even not under TSAN - builds.

----

Added test also reveals another bug: recv<onstack=false> uses channel
after wakeup, and, as at the time of wakeup the channel could be
already destroyed, that segfaults. Fix it by pre-reading in recv
everything needed from _chan object before going to sleep.

This fix cannot go separately from close fix, as fixed close is required
for recv-uses-chan-after-wakeup testcase.
parent c92a4830
...@@ -173,11 +173,13 @@ cdef extern from * nogil: ...@@ -173,11 +173,13 @@ cdef extern from * nogil:
extern void _test_chan_cpp(); extern void _test_chan_cpp();
extern void _test_chan_vs_stackdeadwhileparked(); extern void _test_chan_vs_stackdeadwhileparked();
extern void _test_go_cpp(); extern void _test_go_cpp();
extern void _test_close_wakeup_all();
""" """
void _test_chan_cpp_refcount() except +topyexc void _test_chan_cpp_refcount() except +topyexc
void _test_chan_cpp() except +topyexc void _test_chan_cpp() except +topyexc
void _test_chan_vs_stackdeadwhileparked() except +topyexc void _test_chan_vs_stackdeadwhileparked() except +topyexc
void _test_go_cpp() except +topyexc void _test_go_cpp() except +topyexc
void _test_close_wakeup_all() except +topyexc
def test_chan_cpp_refcount(): def test_chan_cpp_refcount():
with nogil: with nogil:
_test_chan_cpp_refcount() _test_chan_cpp_refcount()
...@@ -190,3 +192,6 @@ def test_chan_vs_stackdeadwhileparked(): ...@@ -190,3 +192,6 @@ def test_chan_vs_stackdeadwhileparked():
def test_go_cpp(): def test_go_cpp():
with nogil: with nogil:
_test_go_cpp() _test_go_cpp()
def test_close_wakeup_all():
with nogil:
_test_close_wakeup_all()
...@@ -465,6 +465,10 @@ void _chan::decref() { ...@@ -465,6 +465,10 @@ void _chan::decref() {
return; return;
// refcnt=0 -> free the channel // refcnt=0 -> free the channel
if (!list_empty(&ch->_recvq))
panic("chan: decref: free: recvq not empty");
if (!list_empty(&ch->_sendq))
panic("chan: decref: free: sendq not empty");
ch->_mu.~Mutex(); ch->_mu.~Mutex();
memset((void *)ch, 0, sizeof(*ch) + ch->_cap*ch->_elemsize); memset((void *)ch, 0, sizeof(*ch) + ch->_cap*ch->_elemsize);
free(ch); free(ch);
...@@ -588,7 +592,8 @@ template<> bool _chan::_recv2_</*onstack=*/false>(void *prx) { _chan *ch = this ...@@ -588,7 +592,8 @@ template<> bool _chan::_recv2_</*onstack=*/false>(void *prx) { _chan *ch = this
return __recv2_(prx, g.get(), me.get()); return __recv2_(prx, g.get(), me.get());
// prx stack -> onheap + copy back (if prx is on stack) TODO avoid copy if prx is !onstack // prx stack -> onheap + copy back (if prx is on stack) TODO avoid copy if prx is !onstack
void *prx_onheap = malloc(ch->_elemsize); unsigned ch_elemsize = ch->_elemsize;
void *prx_onheap = malloc(ch_elemsize);
if (prx_onheap == NULL) { if (prx_onheap == NULL) {
ch->_mu.unlock(); ch->_mu.unlock();
throw bad_alloc(); throw bad_alloc();
...@@ -598,7 +603,8 @@ template<> bool _chan::_recv2_</*onstack=*/false>(void *prx) { _chan *ch = this ...@@ -598,7 +603,8 @@ template<> bool _chan::_recv2_</*onstack=*/false>(void *prx) { _chan *ch = this
}); });
bool ok = __recv2_(prx_onheap, g.get(), me.get()); bool ok = __recv2_(prx_onheap, g.get(), me.get());
memcpy(prx, prx_onheap, ch->_elemsize); // NOTE don't access ch after wakeup
memcpy(prx, prx_onheap, ch_elemsize);
return ok; return ok;
} }
...@@ -741,30 +747,33 @@ void _chan::close() { ...@@ -741,30 +747,33 @@ void _chan::close() {
} }
ch->_closed = true; ch->_closed = true;
// wake-up all readers // TODO better relink dequed waiters to separate wakeup queue
vector<_RecvSendWaiting*> wakeupv;
// schedule: wake-up all readers
while (1) { while (1) {
_RecvSendWaiting *recv = _dequeWaiter(&ch->_recvq); _RecvSendWaiting *recv = _dequeWaiter(&ch->_recvq);
if (recv == NULL) if (recv == NULL)
break; break;
ch->_mu.unlock();
if (recv->pdata != NULL) if (recv->pdata != NULL)
memset(recv->pdata, 0, ch->_elemsize); memset(recv->pdata, 0, ch->_elemsize);
recv->wakeup(/*ok=*/false); wakeupv.push_back(recv);
ch->_mu.lock();
} }
// wake-up all writers (they will panic) // schedule: wake-up all writers (they will panic)
while (1) { while (1) {
_RecvSendWaiting *send = _dequeWaiter(&ch->_sendq); _RecvSendWaiting *send = _dequeWaiter(&ch->_sendq);
if (send == NULL) if (send == NULL)
break; break;
ch->_mu.unlock(); wakeupv.push_back(send);
send->wakeup(/*ok=*/false);
ch->_mu.lock();
} }
ch->_mu.unlock(); ch->_mu.unlock();
// perform scheduled wakeups outside of ch._mu
for (auto w : wakeupv)
w->wakeup(/*ok=*/false);
} }
// len returns current number of buffered elements. // len returns current number of buffered elements.
......
...@@ -280,3 +280,51 @@ static void _work(int i, chan<structZ> done) { ...@@ -280,3 +280,51 @@ static void _work(int i, chan<structZ> done) {
panic("_work: i != 111"); panic("_work: i != 111");
done.close(); done.close();
} }
// verify that chan close wakes up all consumers atomically - in other words
// that it is safe to e.g. destroy the channel after recv wakeup caused by close.
//
// this also verifies that recv, upon wakeup, does not use channel
// object when it could be already destroyed.
void _test_close_wakeup_all() {
int i, N = 100;
auto ch = makechan<int>();
auto _ch = ch._rawchan();
auto done = makechan<structZ>();
// ch.recv subscriber that destroys ch right after wakeup.
// ch ownership is transferred to this goroutine.
go([ch, done]() mutable {
ch.recv();
// destroy ch _before_ signalling on done. This should be safe to do
// as other workers vvv don't use ch after wakeup from ch.recv().
ASSERT(_chanrefcnt(ch._rawchan()) == 1);
ch = NULL;
done.send(structZ{});
});
waitBlocked(_ch, /*nrx=*/1, /*ntx=*/0);
ASSERT(_chanrefcnt(_ch) == 2);
ch = NULL;
ASSERT(_chanrefcnt(_ch) == 1);
// many other ch.recv subscribers queued to ch.recvq
// their lifetime is subset of ^^^ subscriber lifetime; they don't own a ch reference.
for (i=0; i < N; i++) {
go([_ch, done]() {
_chanrecv(_ch, NULL);
done.send(structZ{});
});
}
// wait till all workers block in ch.recv()
waitBlocked(_ch, /*nrx=*/1+N, 0);
// ch.close() must wake up all workers atomically. If it is not the case,
// this will reliably (N >> 1) trigger assert in chan decref on len(ch.recvq) == 0.
ASSERT(_chanrefcnt(_ch) == 1);
_chanclose(_ch);
// wait till all workers finish
for (i=0; i < 1+N; i++)
done.recv();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment