Commit 2590e9a7 authored by Kirill Smelkov's avatar Kirill Smelkov

libgolang: _selcase: Introduce accessors to get pointers to for-send and for-recv data

In the next patch we are going to teach _selcase to support both
external and inplace data. Before that let's do a couple of preparatory
things.

This patch: introduces .ptx() and .prx() accessors to get to
corresponding data buffer associated with _selcase. Convert _selcase
users to use .ptx() and .prx() instead of doing direct .ptxrx access.
This way when we'll add inplace support to _selcase, we'll need to adapt
only accessors, not clients.

The only place that is left using .ptxrx directly is one tricky place in
_chanselect2<onstack=false> where gevent-related code needs to carefully
deal with proxying tx/rx buffers due to STACK_DEAD_WHILE_PARKED.

NOTE even though new accessors may panic, libgolang.cpp always calls them
after checking that the conditions for ptx/prx calls are valid.
parent d6c8862d
...@@ -103,10 +103,14 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil: ...@@ -103,10 +103,14 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil:
_CHANSEND _CHANSEND
_CHANRECV _CHANRECV
_DEFAULT _DEFAULT
struct _selcase: cppclass _selcase:
_chanop op _chanop op
void *ptxrx void *ptxrx
cbool *rxok cbool *rxok
const void *ptx() const
void *prx() const
const _selcase default "golang::_default" const _selcase default "golang::_default"
int select(_selcase casev[]) int select(_selcase casev[])
......
...@@ -331,7 +331,7 @@ def pyselect(*pycasev): ...@@ -331,7 +331,7 @@ def pyselect(*pycasev):
# decref not sent tx (see ^^^ send prepare) # decref not sent tx (see ^^^ send prepare)
for i in range(n): for i in range(n):
if casev[i].op == _CHANSEND and (i != selected): if casev[i].op == _CHANSEND and (i != selected):
p_tx = <PyObject **>casev[i].ptxrx p_tx = <PyObject **>casev[i].ptx()
_tx = p_tx[0] _tx = p_tx[0]
tx = <object>_tx tx = <object>_tx
Py_DECREF(tx) Py_DECREF(tx)
......
...@@ -156,6 +156,16 @@ typedef struct _selcase { ...@@ -156,6 +156,16 @@ typedef struct _selcase {
enum _chanop op; // chansend/chanrecv/default enum _chanop op; // chansend/chanrecv/default
void *ptxrx; // chansend: ptx; chanrecv: prx void *ptxrx; // chansend: ptx; chanrecv: prx
bool *rxok; // chanrecv: where to save ok if !NULL; otherwise not used bool *rxok; // chanrecv: where to save ok if !NULL; otherwise not used
#ifdef __cplusplus
// ptx returns pointer to data to send for this case.
// .op must be _CHANSEND.
LIBGOLANG_API const void *ptx() const;
// prx returns pointer for this case to receive data into.
// .op must be _CHANRECV.
LIBGOLANG_API void *prx() const;
#endif
} _selcase; } _selcase;
LIBGOLANG_API int _chanselect(const _selcase *casev, int casec); LIBGOLANG_API int _chanselect(const _selcase *casev, int casec);
......
...@@ -863,6 +863,21 @@ const _selcase _default = { ...@@ -863,6 +863,21 @@ const _selcase _default = {
.rxok = NULL, .rxok = NULL,
}; };
const void *_selcase::ptx() const {
const _selcase *cas = this;
if (cas->op != _CHANSEND)
panic("_selcase: ptx: op != send");
return cas->ptxrx;
}
void *_selcase::prx() const {
const _selcase *cas = this;
if (cas->op != _CHANRECV)
panic("_selcase: prx: op != recv");
return cas->ptxrx;
}
static const _RecvSendWaiting _sel_txrx_prepoll_won; static const _RecvSendWaiting _sel_txrx_prepoll_won;
template<bool onstack> static int _chanselect2(const _selcase *, int, const vector<int>&); template<bool onstack> static int _chanselect2(const _selcase *, int, const vector<int>&);
template<> int _chanselect2</*onstack=*/true> (const _selcase *, int, const vector<int>&); template<> int _chanselect2</*onstack=*/true> (const _selcase *, int, const vector<int>&);
...@@ -916,7 +931,7 @@ int _chanselect(const _selcase *casev, int casec) { ...@@ -916,7 +931,7 @@ int _chanselect(const _selcase *casev, int casec) {
if (ch != NULL) { // nil chan is never ready if (ch != NULL) { // nil chan is never ready
ch->_mu.lock(); ch->_mu.lock();
if (1) { if (1) {
bool done = ch->_trysend(cas->ptxrx); bool done = ch->_trysend(cas->ptx());
if (done) if (done)
return n; return n;
} }
...@@ -930,7 +945,7 @@ int _chanselect(const _selcase *casev, int casec) { ...@@ -930,7 +945,7 @@ int _chanselect(const _selcase *casev, int casec) {
if (ch != NULL) { // nil chan is never ready if (ch != NULL) { // nil chan is never ready
ch->_mu.lock(); ch->_mu.lock();
if (1) { if (1) {
bool ok, done = ch->_tryrecv(cas->ptxrx, &ok); bool ok, done = ch->_tryrecv(cas->prx(), &ok);
if (done) { if (done) {
if (cas->rxok != NULL) if (cas->rxok != NULL)
*cas->rxok = ok; *cas->rxok = ok;
...@@ -1084,7 +1099,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv ...@@ -1084,7 +1099,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv
// send // send
if (cas->op == _CHANSEND) { if (cas->op == _CHANSEND) {
bool done = ch->_trysend(cas->ptxrx); bool done = ch->_trysend(cas->ptx());
if (done) { if (done) {
g->which = &_sel_txrx_prepoll_won; // !NULL not to let already queued cases win g->which = &_sel_txrx_prepoll_won; // !NULL not to let already queued cases win
return n; return n;
...@@ -1095,7 +1110,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv ...@@ -1095,7 +1110,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv
_RecvSendWaiting *w = &waitv[waitc++]; _RecvSendWaiting *w = &waitv[waitc++];
w->init(g, ch); w->init(g, ch);
w->pdata = cas->ptxrx; w->pdata = (void *)cas->ptx();
w->ok = false; w->ok = false;
w->sel_n = n; w->sel_n = n;
...@@ -1104,7 +1119,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv ...@@ -1104,7 +1119,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv
// recv // recv
else if (cas->op == _CHANRECV) { else if (cas->op == _CHANRECV) {
bool ok, done = ch->_tryrecv(cas->ptxrx, &ok); bool ok, done = ch->_tryrecv(cas->prx(), &ok);
if (done) { if (done) {
g->which = &_sel_txrx_prepoll_won; // !NULL not to let already queued cases win g->which = &_sel_txrx_prepoll_won; // !NULL not to let already queued cases win
if (cas->rxok != NULL) if (cas->rxok != NULL)
...@@ -1117,7 +1132,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv ...@@ -1117,7 +1132,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv
_RecvSendWaiting *w = &waitv[waitc++]; _RecvSendWaiting *w = &waitv[waitc++];
w->init(g, ch); w->init(g, ch);
w->pdata = cas->ptxrx; w->pdata = cas->prx();
w->ok = false; w->ok = false;
w->sel_n = n; w->sel_n = n;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment