Commit 33cf3113 authored by Kirill Smelkov's avatar Kirill Smelkov

sync: Move/Port sync.WorkGroup to C++/Pyx nogil

Provide sync.WorkGroup that can be used directly from C++ and Pyx/nogil codes.
Python-level sync.WorkGroup becomes a wrapper around pyx/nogil one.

Like with context (2a359791 "context: Move/Port context package to
C++/Pyx nogil"), timers (b073f6df "time: Move/Port timers to C++/Pyx
nogil") and interfaces (5a99b769 "libgolang: Start providing
interfaces") memory for on-heap allocated WorkGroup is managed
automatically.

Python-level tests should be enough to cover C++/Pyx functionality at
zero-level approximation.
parent e6788170
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
"""Package sync mirrors and amends Go package sync. """Package sync mirrors and amends Go package sync.
- `WorkGroup` allows to spawn group of goroutines working on a common task(*).
- `Once` allows to execute an action only once. - `Once` allows to execute an action only once.
- `WaitGroup` allows to wait for a collection of tasks to finish. - `WaitGroup` allows to wait for a collection of tasks to finish.
- `Sema`(*) and `Mutex` provide low-level synchronization. - `Sema`(*) and `Mutex` provide low-level synchronization.
...@@ -28,6 +29,9 @@ See also https://golang.org/pkg/sync for Go sync package documentation. ...@@ -28,6 +29,9 @@ See also https://golang.org/pkg/sync for Go sync package documentation.
(*) not provided in Go version. (*) not provided in Go version.
""" """
from golang cimport error, refptr
from golang cimport context
cdef extern from "golang/sync.h" namespace "golang::sync" nogil: cdef extern from "golang/sync.h" namespace "golang::sync" nogil:
cppclass Sema: cppclass Sema:
Sema() Sema()
...@@ -45,3 +49,15 @@ cdef extern from "golang/sync.h" namespace "golang::sync" nogil: ...@@ -45,3 +49,15 @@ cdef extern from "golang/sync.h" namespace "golang::sync" nogil:
void done() void done()
void add(int delta) void add(int delta)
void wait() void wait()
# WorkGroup
cppclass _WorkGroup:
void go(...) # ... = func<error(context::Context)>
error wait()
cppclass WorkGroup (refptr[_WorkGroup]):
# WorkGroup.X = WorkGroup->X in C++.
void go "_ptr()->go" (...) # ... = func<error(context::Context)>
error wait "_ptr()->wait" ()
WorkGroup NewWorkGroup(context.Context ctx)
...@@ -23,7 +23,17 @@ from __future__ import print_function, absolute_import ...@@ -23,7 +23,17 @@ from __future__ import print_function, absolute_import
from cython cimport final from cython cimport final
from cpython cimport PyObject from cpython cimport PyObject
from golang cimport topyexc from golang cimport nil, newref, topyexc
from golang cimport context
from golang.pyx cimport runtime
ctypedef runtime._PyError* runtime_pPyError # https://github.com/cython/cython/issues/534
# internal API sync.h exposes only to sync.pyx
cdef extern from "golang/sync.h" namespace "golang::sync" nogil:
context.Context _WorkGroup_ctx(_WorkGroup *_wg)
from libcpp.cast cimport dynamic_cast
@final @final
cdef class PySema: cdef class PySema:
...@@ -124,6 +134,102 @@ cdef class PyWaitGroup: ...@@ -124,6 +134,102 @@ cdef class PyWaitGroup:
waitgroup_wait_pyexc(&pywg.wg) waitgroup_wait_pyexc(&pywg.wg)
@final
cdef class PyWorkGroup:
"""WorkGroup is a group of goroutines working on a common task.
Use .go() to spawn goroutines, and .wait() to wait for all of them to
complete, for example:
wg = WorkGroup(ctx)
wg.go(f1)
wg.go(f2)
wg.wait()
Every spawned function accepts context related to the whole work and derived
from ctx used to initialize WorkGroup, for example:
def f1(ctx):
...
Whenever a function returns error (raises exception), the work context is
canceled indicating to other spawned goroutines that they have to cancel their
work. .wait() waits for all spawned goroutines to complete and returns/raises
error, if any, from the first failed subtask.
WorkGroup is modelled after https://godoc.org/golang.org/x/sync/errgroup but
is not equal to it.
"""
cdef WorkGroup wg
cdef context.PyContext _pyctx # PyContext wrapping wg._ctx
def __init__(PyWorkGroup pywg, context.PyContext pyctx):
with nogil:
pywg.wg = workgroup_new_pyexc(pyctx.ctx)
pywg._pyctx = context.PyContext.from_ctx(_WorkGroup_ctx(pywg.wg._ptr()))
def __dealloc__(PyWorkGroup pywg):
pywg.wg = NULL
def go(PyWorkGroup pywg, f, *argv, **kw):
# run f(._pyctx, ...) via _PyCtxFunc whose operator()(ctx)
# verifies that ctx == ._pyctx.ctx and tails to pyrunf().
def pyrunf():
f(pywg._pyctx, *argv, **kw)
with nogil:
workgroup_go_pyctxfunc_pyexc(pywg.wg, pywg._pyctx.ctx, <PyObject*>pyrunf)
def wait(PyWaitGroup g):
cdef error err
with nogil:
err = workgroup_wait_pyexc(g.wg)
if err == nil:
return
# check that err is python error
cdef runtime._PyError *_pyerr = dynamic_cast[runtime_pPyError](err._ptr())
cdef runtime.PyError pyerr = newref(_pyerr)
if pyerr == nil:
# NOTE this also includes runtime.ErrPyStopped
raise AssertionError("non-python error: " + err.Error())
# reraise pyerr with original traceback
pyerr_reraise(pyerr)
# _PyCtxFunc complements PyWorkGroup.go() : it's operator()(ctx) verifies that
# ctx is expected context and further calls python function without any arguments.
# PyWorkGroup.go() arranges to use python functions that are bound to PyContext
# corresponding to ctx.
cdef extern from * nogil:
"""
using namespace golang;
struct _PyCtxFunc : pyx::runtime::PyFunc {
context::Context _ctx; // function is bound to this context
_PyCtxFunc(context::Context ctx, PyObject *pyf)
: PyFunc(pyf) {
this->_ctx = ctx;
}
// dtor - default is ok
// copy - default is ok
// WorkGroup calls f(ctx). We check that ctx is expected WorkGroup._ctx
// and call pyf() instead (which PyWorkgroup setup to be closure to call f(pywg._pyctx)).
error operator() (context::Context ctx) {
if (this->_ctx != ctx)
panic("_PyCtxFunc: called with unexpected context");
return PyFunc::operator() ();
}
};
"""
cppclass _PyCtxFunc (runtime.PyFunc):
__init__(context.Context ctx, PyObject *pyf)
error operator() ()
# ---- misc ---- # ---- misc ----
cdef nogil: cdef nogil:
...@@ -144,3 +250,13 @@ cdef nogil: ...@@ -144,3 +250,13 @@ cdef nogil:
wg.add(delta) wg.add(delta)
void waitgroup_wait_pyexc(WaitGroup *wg) except +topyexc: void waitgroup_wait_pyexc(WaitGroup *wg) except +topyexc:
wg.wait() wg.wait()
WorkGroup workgroup_new_pyexc(context.Context ctx) except +topyexc:
return NewWorkGroup(ctx)
void workgroup_go_pyctxfunc_pyexc(WorkGroup wg, context.Context ctx, PyObject *pyf) except +topyexc:
wg.go(_PyCtxFunc(ctx, pyf))
error workgroup_wait_pyexc(WorkGroup wg) except +topyexc:
return wg.wait()
void pyerr_reraise(runtime.PyError pyerr) except *:
runtime.PyErr_ReRaise(pyerr)
...@@ -96,4 +96,55 @@ void WaitGroup::wait() { ...@@ -96,4 +96,55 @@ void WaitGroup::wait() {
done.recv(); done.recv();
} }
// WorkGroup
_WorkGroup::_WorkGroup() {}
_WorkGroup::~_WorkGroup() {}
void _WorkGroup::decref() {
if (__decref())
delete this;
}
WorkGroup NewWorkGroup(context::Context ctx) {
WorkGroup g = adoptref(new _WorkGroup());
tie(g->_ctx, g->_cancel) = context::with_cancel(ctx);
return g;
}
void _WorkGroup::go(func<error(context::Context)> f) {
// NOTE = refptr<_WorkGroup> because we pass ref to g to spawned worker.
WorkGroup g = newref(this);
g->_wg.add(1);
golang::go([g, f]() { // NOTE g ref passed to spawned worker
defer([&]() {
g->_wg.done();
});
error err = f(g->_ctx); // TODO consider also propagating panic
if (err == NULL)
return;
g->_mu.lock();
defer([&]() {
g->_mu.unlock();
});
if (g->_err == NULL) {
// this goroutine is the first failed task
g->_err = err;
g->_cancel();
}
});
}
error _WorkGroup::wait() {
_WorkGroup& g = *this;
g._wg.wait();
g._cancel();
return g._err;
}
}} // golang::sync:: }} // golang::sync::
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
// Package sync mirrors and amends Go package sync. // Package sync mirrors and amends Go package sync.
// //
// - `WorkGroup` allows to spawn group of goroutines working on a common task(*).
// - `Once` allows to execute an action only once. // - `Once` allows to execute an action only once.
// - `WaitGroup` allows to wait for a collection of tasks to finish. // - `WaitGroup` allows to wait for a collection of tasks to finish.
// - `Sema`(*) and `Mutex` provide low-level synchronization. // - `Sema`(*) and `Mutex` provide low-level synchronization.
...@@ -38,6 +39,7 @@ ...@@ -38,6 +39,7 @@
// (*) not provided in Go version. // (*) not provided in Go version.
#include <golang/libgolang.h> #include <golang/libgolang.h>
#include <golang/context.h>
// ---- C-level API ---- // ---- C-level API ----
...@@ -137,6 +139,72 @@ private: ...@@ -137,6 +139,72 @@ private:
WaitGroup(WaitGroup&&); // don't move WaitGroup(WaitGroup&&); // don't move
}; };
// WorkGroup is a group of goroutines working on a common task.
//
// Use .go() to spawn goroutines, and .wait() to wait for all of them to
// complete, for example:
//
// sync::WorkGroup wg = sync::NewWorkGroup(ctx);
// wg->go(f1);
// wg->go(f2);
// error err = wg->wait();
//
// Every spawned function accepts context related to the whole work and derived
// from ctx used to initialize WorkGroup, for example:
//
// error f1(context::Context ctx) {
// ...
// }
//
// Whenever a function returns error, the work context is canceled indicating
// to other spawned goroutines that they have to cancel their work. .wait()
// waits for all spawned goroutines to complete and returns error, if any, from
// the first failed subtask.
//
// NOTE if spawned function panics, the panic is currently _not_ propagated to .wait().
//
// WorkGroup is modelled after https://godoc.org/golang.org/x/sync/errgroup but
// is not equal to it.
typedef refptr<class _WorkGroup> WorkGroup;
class _WorkGroup : public object {
context::Context _ctx;
func<void()> _cancel;
WaitGroup _wg;
Mutex _mu;
error _err;
// don't new - create only via NewWorkGroup()
private:
_WorkGroup();
~_WorkGroup();
friend WorkGroup NewWorkGroup(context::Context ctx);
public:
LIBGOLANG_API void decref();
public:
LIBGOLANG_API void go(func<error(context::Context)> f);
LIBGOLANG_API error wait();
private:
_WorkGroup(const _WorkGroup&); // don't copy
_WorkGroup(_WorkGroup&&); // don't move
// internal API used by sync.pyx
friend inline context::Context _WorkGroup_ctx(_WorkGroup *_wg);
};
// NewWorkGroup creates new WorkGroup working under ctx.
//
// See WorkGroup documentation for details.
LIBGOLANG_API WorkGroup NewWorkGroup(context::Context ctx);
// sync.pyx uses WorkGroup._ctx directly for efficiency.
#ifdef _LIBGOLANG_SYNC_INTERNAL_API
inline context::Context _WorkGroup_ctx(_WorkGroup *_wg) {
return _wg->_ctx;
}
#endif
}} // golang::sync:: }} // golang::sync::
#endif // __cplusplus #endif // __cplusplus
......
...@@ -41,70 +41,5 @@ from golang._sync import \ ...@@ -41,70 +41,5 @@ from golang._sync import \
PySema as Sema, \ PySema as Sema, \
PyMutex as Mutex, \ PyMutex as Mutex, \
PyOnce as Once, \ PyOnce as Once, \
PyWaitGroup as WaitGroup \ PyWaitGroup as WaitGroup, \
PyWorkGroup as WorkGroup
# WorkGroup is a group of goroutines working on a common task.
#
# Use .go() to spawn goroutines, and .wait() to wait for all of them to
# complete, for example:
#
# wg = WorkGroup(ctx)
# wg.go(f1)
# wg.go(f2)
# wg.wait()
#
# Every spawned function accepts context related to the whole work and derived
# from ctx used to initialize WorkGroup, for example:
#
# def f1(ctx):
# ...
#
# Whenever a function returns error (raises exception), the work context is
# canceled indicating to other spawned goroutines that they have to cancel their
# work. .wait() waits for all spawned goroutines to complete and returns/raises
# error, if any, from the first failed subtask.
#
# WorkGroup is modelled after https://godoc.org/golang.org/x/sync/errgroup but
# is not equal to it.
class WorkGroup(object):
def __init__(g, ctx):
g._ctx, g._cancel = context.with_cancel(ctx)
g._wg = WaitGroup()
g._mu = Mutex()
g._err = None
def go(g, f, *argv, **kw):
g._wg.add(1)
go(lambda: g._run(f, *argv, **kw))
@func
def _run(g, f, *argv, **kw):
defer(g._wg.done)
try:
f(g._ctx, *argv, **kw)
except:
_, exc, tb = sys.exc_info()
with g._mu:
if g._err is None:
# this goroutine is the first failed task
g._err = exc
if six.PY2:
# py3 has __traceback__ automatically
exc.__traceback__ = tb
g._cancel()
exc = None
tb = None
def wait(g):
g._wg.wait()
g._cancel()
if g._err is not None:
# reraise the exception so that original traceback is there
if six.PY3:
raise g._err
else:
six.reraise(g._err, None, g._err.__traceback__)
...@@ -26,7 +26,7 @@ from pytest import raises ...@@ -26,7 +26,7 @@ from pytest import raises
from golang.golang_test import import_pyx_tests, panics from golang.golang_test import import_pyx_tests, panics
from golang.time_test import dt from golang.time_test import dt
from six.moves import range as xrange from six.moves import range as xrange
import six import sys, six
import_pyx_tests("golang._sync_test") import_pyx_tests("golang._sync_test")
...@@ -142,6 +142,13 @@ def test_waitgroup(): ...@@ -142,6 +142,13 @@ def test_waitgroup():
wg.done() wg.done()
# PyErr_Restore_traceback_ok indicates whether python exceptions are restored with correct traceback.
# It is always the case for CPython, but PyPy < 7.3 had a bug:
# https://bitbucket.org/pypy/pypy/issues/3120/pyerr_restore-does-not-restore-traceback
PyErr_Restore_traceback_ok = True
if 'PyPy' in sys.version and sys.pypy_version_info < (7,3):
PyErr_Restore_traceback_ok = False
def test_workgroup(): def test_workgroup():
ctx, cancel = context.with_cancel(context.background()) ctx, cancel = context.with_cancel(context.background())
mu = sync.Mutex() mu = sync.Mutex()
...@@ -187,8 +194,9 @@ def test_workgroup(): ...@@ -187,8 +194,9 @@ def test_workgroup():
wg.wait() wg.wait()
assert exc.type is MyError assert exc.type is MyError
assert exc.value.args == ('aaa',) assert exc.value.args == ('aaa',)
assert 'Iam__' in exc.traceback[-1].locals if PyErr_Restore_traceback_ok:
assert 'Iam_f' in exc.traceback[-2].locals assert 'Iam__' in exc.traceback[-1].locals
assert 'Iam_f' in exc.traceback[-2].locals
assert l == [1, 2] assert l == [1, 2]
# t1=fail, t2=wait cancel, fail # t1=fail, t2=wait cancel, fail
...@@ -213,8 +221,9 @@ def test_workgroup(): ...@@ -213,8 +221,9 @@ def test_workgroup():
wg.wait() wg.wait()
assert exc.type is MyError assert exc.type is MyError
assert exc.value.args == ('bbb',) assert exc.value.args == ('bbb',)
assert 'Iam__' in exc.traceback[-1].locals if PyErr_Restore_traceback_ok:
assert 'Iam_f' in exc.traceback[-2].locals assert 'Iam__' in exc.traceback[-1].locals
assert 'Iam_f' in exc.traceback[-2].locals
assert l == [1, 2] assert l == [1, 2]
......
...@@ -247,7 +247,9 @@ setup( ...@@ -247,7 +247,9 @@ setup(
['golang/_context.pyx']), ['golang/_context.pyx']),
Ext('golang._sync', Ext('golang._sync',
['golang/_sync.pyx']), ['golang/_sync.pyx'],
dsos = ['golang.runtime.libpyxruntime'],
define_macros = [('_LIBGOLANG_SYNC_INTERNAL_API', None)]),
Ext('golang._sync_test', Ext('golang._sync_test',
['golang/_sync_test.pyx']), ['golang/_sync_test.pyx']),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment