Commit 65c43848 authored by Kirill Smelkov's avatar Kirill Smelkov

libgolang: Fix select to wait for won-while-queueing case

During the second phase of select - after all cases were polled and
noone was found to be ready - cases are queued to corresponding channels
recv and send queues. While that queueing is in progress, a case, that
was already queued, could win (see f0b592b4 "select: Don't let both a
queued and a tried cases win at the same time" for details).

If such won case is detected, we break out of queuing loop, but
currently don't wait for that case to become ready. This is a bug,
because when a case is marked as won, its data is not yet copied - for
example for won recv case if we don't wait for that case to become
ready, we will be returning from select while corresponding *prx and
*rxok for recv waiter is still being copied in progress.

An example TSAN reports for this bug are as follows:

(1) WARNING: ThreadSanitizer: data race (pid=8223)
  Read of size 1 at 0x7b1800000a48 by main thread:
    #0 __chanselect2 golang/runtime/libgolang.cpp:1112 (liblibgolang.so.0.1+0x5fd6)
    #1 _chanselect2<true> golang/runtime/libgolang.cpp:949 (liblibgolang.so.0.1+0x6665)
    #2 _chanselect golang/runtime/libgolang.cpp:944 (liblibgolang.so.0.1+0x6665)
    #3 __pyx_f_6golang_7_golang__chanselect_pyexc golang/_golang.cpp:5896 (_golang.so+0x1deac)
    #4 __pyx_pf_6golang_7_golang_4pyselect golang/_golang.cpp:4935 (_golang.so+0x1deac)
    #5 __pyx_pw_6golang_7_golang_5pyselect golang/_golang.cpp:4355 (_golang.so+0x1deac)
    #6 PyEval_EvalFrameEx <null> (python2.7+0xf0e49)

  Previous write of size 1 at 0x7b1800000a48 by thread T57:
    #0 golang::_RecvSendWaiting::wakeup(bool) golang/runtime/libgolang.cpp:346 (liblibgolang.so.0.1+0x459d)
    #1 golang::_chan::_tryrecv(void*, bool*) golang/runtime/libgolang.cpp:730 (liblibgolang.so.0.1+0x511d)
    #2 __chanselect2 golang/runtime/libgolang.cpp:1074 (liblibgolang.so.0.1+0x5d4b)
    #3 _chanselect2<true> golang/runtime/libgolang.cpp:949 (liblibgolang.so.0.1+0x6665)
    #4 _chanselect golang/runtime/libgolang.cpp:944 (liblibgolang.so.0.1+0x6665)
    #5 __pyx_f_6golang_7_golang__chanselect_pyexc golang/_golang.cpp:5896 (_golang.so+0x1deac)
    #6 __pyx_pf_6golang_7_golang_4pyselect golang/_golang.cpp:4935 (_golang.so+0x1deac)
    #7 __pyx_pw_6golang_7_golang_5pyselect golang/_golang.cpp:4355 (_golang.so+0x1deac)
    #8 PyEval_EvalFrameEx <null> (python2.7+0xf0e49)
    #9 <null> <null> (python2.7+0x1929e3)

  Location is heap block of size 96 at 0x7b1800000a20 allocated by main thread:
    #0 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:623 (libtsan.so.0+0x2b323)
    #1 calloc ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:618 (libtsan.so.0+0x2b323)
    #2 __chanselect2 golang/runtime/libgolang.cpp:1018 (liblibgolang.so.0.1+0x5b8c)
    #3 _chanselect2<true> golang/runtime/libgolang.cpp:949 (liblibgolang.so.0.1+0x6665)
    #4 _chanselect golang/runtime/libgolang.cpp:944 (liblibgolang.so.0.1+0x6665)
    #5 __pyx_f_6golang_7_golang__chanselect_pyexc golang/_golang.cpp:5896 (_golang.so+0x1deac)
    #6 __pyx_pf_6golang_7_golang_4pyselect golang/_golang.cpp:4935 (_golang.so+0x1deac)
    #7 __pyx_pw_6golang_7_golang_5pyselect golang/_golang.cpp:4355 (_golang.so+0x1deac)
    #8 PyEval_EvalFrameEx <null> (python2.7+0xf0e49)

  Thread T57 (tid=13758, finished) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:915 (libtsan.so.0+0x2be1b)
    #1 PyThread_start_new_thread <null> (python2.7+0x19299f)
    #2 _taskgo golang/runtime/libgolang.cpp:123 (liblibgolang.so.0.1+0x3f98)
    #3 __pyx_f_6golang_7_golang__taskgo_pyexc golang/_golang.cpp:5926 (_golang.so+0x16f7e)
    #4 __pyx_pf_6golang_7_golang_2pygo golang/_golang.cpp:2399 (_golang.so+0x16f7e)
    #5 __pyx_pw_6golang_7_golang_3pygo golang/_golang.cpp:2324 (_golang.so+0x16f7e)
    #6 PyEval_EvalFrameEx <null> (python2.7+0xf0e49)

(2) WARNING: ThreadSanitizer: data race (pid=14185)
  Write of size 8 at 0x7b1800003000 by thread T95:
    #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:649 (libtsan.so.0+0x2b46a)
    #1 free ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:643 (libtsan.so.0+0x2b46a)
    #2 operator() golang/runtime/libgolang.cpp:1023 (liblibgolang.so.0.1+0x44bd)
    #3 _M_invoke /usr/include/c++/8/bits/std_function.h:297 (liblibgolang.so.0.1+0x44bd)
    #4 std::function<void ()>::operator()() const /usr/include/c++/8/bits/std_function.h:687 (liblibgolang.so.0.1+0x5fd8)
    #5 golang::_deferred::~_deferred() golang/runtime/libgolang.cpp:215 (liblibgolang.so.0.1+0x5fd8)
    #6 __chanselect2 golang/runtime/libgolang.cpp:1023 (liblibgolang.so.0.1+0x5fd8)
    #7 _chanselect2<true> golang/runtime/libgolang.cpp:949 (liblibgolang.so.0.1+0x6736)
    #8 _chanselect golang/runtime/libgolang.cpp:944 (liblibgolang.so.0.1+0x6736)
    #9 __pyx_f_6golang_7_golang__chanselect_pyexc golang/_golang.cpp:5896 (_golang.cpython-36m-x86_64-linux-gnu.so+0x1e562)
    #10 __pyx_pf_6golang_7_golang_4pyselect golang/_golang.cpp:4935 (_golang.cpython-36m-x86_64-linux-gnu.so+0x1e562)
    #11 __pyx_pw_6golang_7_golang_5pyselect golang/_golang.cpp:4355 (_golang.cpython-36m-x86_64-linux-gnu.so+0x1e562)
    #12 _PyCFunction_FastCallDict Objects/methodobject.c:231 (python3.6+0xd4db9)
    #13 pythread_wrapper Python/thread_pthread.h:205 (python3.6+0x6c5d6)

  Previous read of size 8 at 0x7b1800003000 by main thread:
    #0 golang::_RecvSendWaiting::wakeup(bool) golang/runtime/libgolang.cpp:347 (liblibgolang.so.0.1+0x4769)
    #1 golang::_chan::_trysend(void const*) golang/runtime/libgolang.cpp:661 (liblibgolang.so.0.1+0x5781)
    #2 _chanselect golang/runtime/libgolang.cpp:901 (liblibgolang.so.0.1+0x64d9)
    #3 __pyx_f_6golang_7_golang__chanselect_pyexc golang/_golang.cpp:5896 (_golang.cpython-36m-x86_64-linux-gnu.so+0x1e562)
    #4 __pyx_pf_6golang_7_golang_4pyselect golang/_golang.cpp:4935 (_golang.cpython-36m-x86_64-linux-gnu.so+0x1e562)
    #5 __pyx_pw_6golang_7_golang_5pyselect golang/_golang.cpp:4355 (_golang.cpython-36m-x86_64-linux-gnu.so+0x1e562)
    #6 _PyCFunction_FastCallDict Objects/methodobject.c:231 (python3.6+0xd4db9)

  Thread T95 (tid=16547, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:915 (libtsan.so.0+0x2be1b)
    #1 PyThread_start_new_thread Python/thread_pthread.h:252 (python3.6+0x6c67e)
    #2 _taskgo golang/runtime/libgolang.cpp:123 (liblibgolang.so.0.1+0x4158)
    #3 __pyx_f_6golang_7_golang__taskgo_pyexc golang/_golang.cpp:5926 (_golang.cpython-36m-x86_64-linux-gnu.so+0x1a9b5)
    #4 __pyx_pf_6golang_7_golang_2pygo golang/_golang.cpp:2399 (_golang.cpython-36m-x86_64-linux-gnu.so+0x1a9b5)
    #5 __pyx_pw_6golang_7_golang_3pygo golang/_golang.cpp:2324 (_golang.cpython-36m-x86_64-linux-gnu.so+0x1a9b5)
    #6 _PyCFunction_FastCallDict Objects/methodobject.c:231 (python3.6+0xd4db9)

-> Fix it by always waiting for WaitGroup's won case to become ready.

The bug was introduced in 3b241983 (Port/move channels to C/C++/Pyx). Before
that - when channels were implemented at Python level, we were always waiting
on select's group.

Added test catches the bug on all - even not under TSAN - builds.
parent dcf4ebd1
...@@ -174,12 +174,14 @@ cdef extern from * nogil: ...@@ -174,12 +174,14 @@ cdef extern from * nogil:
extern void _test_chan_vs_stackdeadwhileparked(); extern void _test_chan_vs_stackdeadwhileparked();
extern void _test_go_cpp(); extern void _test_go_cpp();
extern void _test_close_wakeup_all(); extern void _test_close_wakeup_all();
extern void _test_select_win_while_queue();
""" """
void _test_chan_cpp_refcount() except +topyexc void _test_chan_cpp_refcount() except +topyexc
void _test_chan_cpp() except +topyexc void _test_chan_cpp() except +topyexc
void _test_chan_vs_stackdeadwhileparked() except +topyexc void _test_chan_vs_stackdeadwhileparked() except +topyexc
void _test_go_cpp() except +topyexc void _test_go_cpp() except +topyexc
void _test_close_wakeup_all() except +topyexc void _test_close_wakeup_all() except +topyexc
void _test_select_win_while_queue() except +topyexc
def test_chan_cpp_refcount(): def test_chan_cpp_refcount():
with nogil: with nogil:
_test_chan_cpp_refcount() _test_chan_cpp_refcount()
...@@ -195,3 +197,6 @@ def test_go_cpp(): ...@@ -195,3 +197,6 @@ def test_go_cpp():
def test_close_wakeup_all(): def test_close_wakeup_all():
with nogil: with nogil:
_test_close_wakeup_all() _test_close_wakeup_all()
def test_select_win_while_queue():
with nogil:
_test_select_win_while_queue()
...@@ -372,17 +372,20 @@ bool _WaitGroup::try_to_win(_RecvSendWaiting *waiter) { // -> won ...@@ -372,17 +372,20 @@ bool _WaitGroup::try_to_win(_RecvSendWaiting *waiter) { // -> won
return won; return won;
} }
// wait waits for winning case of group to complete. // wait waits for winning case of the group to become ready.
void _WaitGroup::wait() { void _WaitGroup::wait() {
_WaitGroup *group = this; _WaitGroup *group = this;
group->_sema.acquire(); group->_sema.acquire();
} }
// wakeup wakes up the group. // wakeup notifies the group that the winning case becomes ready.
// //
// prior to wakeup try_to_win must have been called. // prior to wakeup try_to_win must have been called.
// in practice this means that waiters queued to chan.{_send|_recv}q must // in practice this means that waiters queued to chan.{_send|_recv}q must
// be dequeued with _dequeWaiter. // be dequeued with _dequeWaiter.
//
// it is ok to call wakeup before wait - wait won't miss the readiness
// notification.
void _WaitGroup::wakeup() { void _WaitGroup::wakeup() {
_WaitGroup *group = this; _WaitGroup *group = this;
if (group->which == NULL) if (group->which == NULL)
...@@ -393,6 +396,8 @@ void _WaitGroup::wakeup() { ...@@ -393,6 +396,8 @@ void _WaitGroup::wakeup() {
// _dequeWaiter dequeues a send or recv waiter from a channel's _recvq or _sendq. // _dequeWaiter dequeues a send or recv waiter from a channel's _recvq or _sendq.
// //
// the channel owning {_recv|_send}q must be locked. // the channel owning {_recv|_send}q must be locked.
// if the waiter is successfully dequeued, the caller must wake it up, but only
// after copying sent/recv data.
_RecvSendWaiting *_dequeWaiter(list_head *queue) { _RecvSendWaiting *_dequeWaiter(list_head *queue) {
while (!list_empty(queue)) { while (!list_empty(queue)) {
_RecvSendWaiting *w = list_entry(queue->next, _RecvSendWaiting, in_rxtxq); _RecvSendWaiting *w = list_entry(queue->next, _RecvSendWaiting, in_rxtxq);
...@@ -1046,7 +1051,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv ...@@ -1046,7 +1051,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv
// queuing other cases. // queuing other cases.
if (g->which != NULL) { if (g->which != NULL) {
ch->_mu.unlock(); ch->_mu.unlock();
goto ready; goto wait_case_ready;
} }
// send // send
...@@ -1100,8 +1105,8 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv ...@@ -1100,8 +1105,8 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv
} }
// wait for a case to become ready // wait for a case to become ready
wait_case_ready:
g->wait(); g->wait();
ready:
if (g->which == &_sel_txrx_prepoll_won) if (g->which == &_sel_txrx_prepoll_won)
bug("select: woke up with g.which=_sel_txrx_prepoll_won"); bug("select: woke up with g.which=_sel_txrx_prepoll_won");
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <stdio.h> #include <stdio.h>
#include <tuple> #include <tuple>
#include <utility> #include <utility>
#include <string.h>
using namespace golang; using namespace golang;
using std::function; using std::function;
using std::move; using std::move;
...@@ -328,3 +329,55 @@ void _test_close_wakeup_all() { ...@@ -328,3 +329,55 @@ void _test_close_wakeup_all() {
for (i=0; i < 1+N; i++) for (i=0; i < 1+N; i++)
done.recv(); done.recv();
} }
// verify that select correctly handles situation where a case that is already
// queued wins while select queues other cases.
void __test_select_win_while_queue() {
const int Ncase = 1000; // many select cases to ↑ p(win-while-queue)
const int Ndata = 1*1024*1024; // big element size to ↑ time when case won, but not yet woken up
int i;
// Data is workaround for "error: function returning an array" if we use
// chan<char[Ndata]> directly.
struct Data { char _[Ndata]; };
auto ch = makechan<Data>();
auto ch2 = makechan<int>();
auto done = makechan<structZ>();
Data *data_send = (Data *)calloc(1, sizeof(Data));
Data *data_recv = (Data *)calloc(1, sizeof(Data));
if (data_send == NULL || data_recv == NULL)
throw std::bad_alloc();
for (i=0; i<Ndata; i++)
data_send->_[i] = i % 0xff;
// win first select case (see vvv) right after it is queued.
go([ch, data_send, done]() {
waitBlocked_RX(ch);
// select queued ch.recv and is likely still queing other cases.
// -> win ch.recv
ch.send(*data_send);
done.close();
});
// select {ch.recv, ch2.recv, ch2.recv, ch2.recv, ...}
_selcase casev[1+Ncase];
bool ok=false;
casev[0] = ch.recvs(data_recv, &ok);
for (i=0; i<Ncase; i++)
casev[1+i] = ch2.recvs();
int _ = select(casev);
ASSERT(_ == 0);
ASSERT(ok == true);
ASSERT(!memcmp(data_recv, data_send, sizeof(Data)));
done.recv();
free(data_send);
free(data_recv);
}
void _test_select_win_while_queue() {
int i, N = 100;
for (i=0; i<N; i++)
__test_select_win_while_queue();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment