Commit 88aa51ec authored by Kirill Smelkov's avatar Kirill Smelkov

X goodbye xsync

There it was errgroup.Group adjusted with Gox for functions with
exceptions, but exceptions usage should be constrained to tests only and
it is easy to do with just Go(exc.Funcx(...)) explicitly or via local
gox function as syntax sugar.
parent b9bd0b73
......@@ -30,12 +30,13 @@ import (
"testing"
"time"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xsync"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/zodb"
"github.com/kylelemons/godebug/pretty"
"github.com/pkg/errors"
)
......@@ -78,6 +79,10 @@ func xhandshake(ctx context.Context, c net.Conn, version uint32) {
exc.Raiseif(err)
}
func gox(wg interface { Go(func() error) }, xf func()) {
wg.Go(exc.Funcx(xf))
}
// xlinkError verifies that err is *LinkError and returns err.Err
func xlinkError(err error) error {
le, ok := err.(*LinkError)
......@@ -175,8 +180,8 @@ func TestNodeLink(t *testing.T) {
//println("000")
// Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := &xsync.WorkGroup{}
wg.Gox(func() {
wg := &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl1)
})
......@@ -189,8 +194,8 @@ func TestNodeLink(t *testing.T) {
// Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl1)
})
......@@ -204,8 +209,8 @@ func TestNodeLink(t *testing.T) {
// {Close,CloseAccept} vs Accept
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl2)
})
......@@ -213,7 +218,7 @@ func TestNodeLink(t *testing.T) {
if !(c == nil && xlinkError(err) == ErrLinkClosed) {
t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err)
}
wg.Gox(func() {
gox(wg, func() {
tdelay()
nl1.CloseAccept()
})
......@@ -233,8 +238,8 @@ func TestNodeLink(t *testing.T) {
//println("111")
// Close vs recvPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl2)
})
......@@ -247,8 +252,8 @@ func TestNodeLink(t *testing.T) {
// Close vs sendPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl2)
})
......@@ -265,15 +270,15 @@ func TestNodeLink(t *testing.T) {
// raw exchange
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg, ctx := xsync.WorkGroupCtx(context.Background())
wg.Gox(func() {
wg, ctx := errgroup.WithContext(context.Background())
gox(wg, func() {
// send ping; wait for pong
pkt := _mkpkt(1, 2, []byte("ping"))
xsendPkt(nl1, pkt)
pkt = xrecvPkt(nl1)
xverifyPkt(pkt, 3, 4, []byte("pong"))
})
wg.Gox(func() {
gox(wg, func() {
// wait for ping; send pong
pkt = xrecvPkt(nl2)
xverifyPkt(pkt, 1, 2, []byte("ping"))
......@@ -282,8 +287,8 @@ func TestNodeLink(t *testing.T) {
})
// close nodelinks either when checks are done, or upon first error
wgclose := &xsync.WorkGroup{}
wgclose.Gox(func() {
wgclose := &errgroup.Group{}
gox(wgclose, func() {
<-ctx.Done()
xclose(nl1)
xclose(nl2)
......@@ -299,8 +304,8 @@ func TestNodeLink(t *testing.T) {
// Close vs recvPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1)
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(c)
})
......@@ -317,8 +322,8 @@ func TestNodeLink(t *testing.T) {
// Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1)
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(c)
})
......@@ -334,14 +339,14 @@ func TestNodeLink(t *testing.T) {
// NodeLink.Close vs Conn.sendPkt/recvPkt
c11 := xnewconn(nl1)
c12 := xnewconn(nl1)
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
pkt, err := c11.recvPkt()
if !(pkt == nil && xconnError(err) == ErrLinkClosed) {
exc.Raisef("Conn.recvPkt() after NodeLink close: pkt = %v err = %v", pkt, err)
}
})
wg.Gox(func() {
gox(wg, func() {
pkt := c12.mkpkt(0, []byte("data"))
err := c12.sendPkt(pkt)
if xconnError(err) != ErrLinkClosed {
......@@ -362,9 +367,9 @@ func TestNodeLink(t *testing.T) {
c21 := xnewconn(nl2)
c22 := xnewconn(nl2)
c23 := xnewconn(nl2)
wg = &xsync.WorkGroup{}
wg = &errgroup.Group{}
var errRecv error
wg.Gox(func() {
gox(wg, func() {
pkt, err := c21.recvPkt()
want1 := io.EOF // if recvPkt wakes up due to peer close
want2 := io.ErrClosedPipe // if recvPkt wakes up due to sendPkt wakes up first and closes nl1
......@@ -375,7 +380,7 @@ func TestNodeLink(t *testing.T) {
errRecv = cerr
})
wg.Gox(func() {
gox(wg, func() {
pkt := c22.mkpkt(0, []byte("data"))
err := c22.sendPkt(pkt)
want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2
......@@ -384,7 +389,7 @@ func TestNodeLink(t *testing.T) {
}
})
wg.Gox(func() {
gox(wg, func() {
conn, err := nl2.Accept()
if !(conn == nil && xlinkError(err) == ErrLinkDown) {
exc.Raisef("Accept after peer NodeLink shutdown: conn = %v err = %v", conn, err)
......@@ -484,9 +489,9 @@ func TestNodeLink(t *testing.T) {
// Conn accept + exchange
nl1, nl2 = nodeLinkPipe()
nl1.CloseAccept()
wg = &xsync.WorkGroup{}
wg = &errgroup.Group{}
closed := make(chan int)
wg.Gox(func() {
gox(wg, func() {
c := xaccept(nl2)
pkt := xrecvPkt(c)
......@@ -604,7 +609,7 @@ func TestNodeLink(t *testing.T) {
// test 2 channels with replies coming in reversed time order
nl1, nl2 = nodeLinkPipe()
wg = &xsync.WorkGroup{}
wg = &errgroup.Group{}
replyOrder := map[uint16]struct { // "order" in which to process requests
start chan struct{} // processing starts when start chan is ready
next uint16 // after processing this switch to next
......@@ -614,11 +619,11 @@ func TestNodeLink(t *testing.T) {
}
close(replyOrder[2].start)
wg.Gox(func() {
gox(wg, func() {
for _ = range replyOrder {
c := xaccept(nl2)
wg.Gox(func() {
gox(wg, func() {
pkt := xrecvPkt(c)
n := ntoh16(pkt.Header().MsgCode)
x := replyOrder[n]
......@@ -662,11 +667,11 @@ func TestHandshake(t *testing.T) {
bg := context.Background()
// handshake ok
p1, p2 := net.Pipe()
wg := &xsync.WorkGroup{}
wg.Gox(func() {
wg := &errgroup.Group{}
gox(wg, func() {
xhandshake(bg, p1, 1)
})
wg.Gox(func() {
gox(wg, func() {
xhandshake(bg, p2, 1)
})
xwait(wg)
......@@ -676,11 +681,11 @@ func TestHandshake(t *testing.T) {
// version mismatch
p1, p2 = net.Pipe()
var err1, err2 error
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
err1 = handshake(bg, p1, 1)
})
wg.Gox(func() {
gox(wg, func() {
err2 = handshake(bg, p2, 2)
})
xwait(wg)
......@@ -700,11 +705,11 @@ func TestHandshake(t *testing.T) {
// tx & rx problem
p1, p2 = net.Pipe()
err1, err2 = nil, nil
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
err1 = handshake(bg, p1, 1)
})
wg.Gox(func() {
gox(wg, func() {
xclose(p2)
})
xwait(wg)
......@@ -719,7 +724,7 @@ func TestHandshake(t *testing.T) {
// ctx cancel
p1, p2 = net.Pipe()
ctx, cancel := context.WithCancel(bg)
wg.Gox(func() {
gox(wg, func() {
err1 = handshake(ctx, p1, 1)
})
tdelay()
......@@ -770,9 +775,9 @@ func TestRecv1Mode(t *testing.T) {
//println("000")
// Send1
nl1, nl2 := nodeLinkPipe()
wg := &xsync.WorkGroup{}
wg := &errgroup.Group{}
sync := make(chan int)
wg.Gox(func() {
gox(wg, func() {
defer func() {
if e := recover(); e != nil {
panic(e)
......@@ -822,8 +827,8 @@ func TestRecv1Mode(t *testing.T) {
//println("111\n")
// Recv1: further packets with same connid are rejected with "connection closed"
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
c := xnewconn(nl2)
//println("aaa")
......@@ -852,11 +857,11 @@ func TestRecv1Mode(t *testing.T) {
// bug triggers under -race
func TestLightCloseVsLinkShutdown(t *testing.T) {
nl1, nl2 := nodeLinkPipe()
wg := &xsync.WorkGroup{}
wg := &errgroup.Group{}
c := xnewconn(nl1)
nl1.shutdown()
wg.Gox(func() {
gox(wg, func() {
c.lightClose()
})
......@@ -1202,13 +1207,13 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) {
var l1, l2 *NodeLink
wg := &xsync.WorkGroup{}
wg.Gox(func() {
wg := &errgroup.Group{}
gox(wg, func() {
l, err := Handshake(context.Background(), c1, LinkClient)
exc.Raiseif(err)
l1 = l
})
wg.Gox(func() {
gox(wg, func() {
l, err := Handshake(context.Background(), c2, LinkServer)
exc.Raiseif(err)
l2 = l
......
......@@ -43,7 +43,6 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
"lab.nexedi.com/kirr/neo/go/xcommon/xsync"
"lab.nexedi.com/kirr/neo/go/xcommon/xtesting"
"lab.nexedi.com/kirr/go123/exc"
......@@ -62,6 +61,10 @@ func xwait(w interface { Wait() error }) {
exc.Raiseif(err)
}
func gox(wg interface { Go(func() error) }, xf func()) {
wg.Go(exc.Funcx(xf))
}
func xfs1stor(path string) *fs1.FileStorage {
zstor, err := fs1.Open(bg, path)
exc.Raiseif(err)
......@@ -227,14 +230,14 @@ func TestMasterStorage(t *testing.T) {
Shost := xnet.NetTrace(net.Host("s"), tracer)
Chost := xnet.NetTrace(net.Host("c"), tracer)
gwg := &xsync.WorkGroup{}
gwg := &errgroup.Group{}
// start master
Mclock := &vclock{}
M := NewMaster("abc1", ":1", Mhost)
M.monotime = Mclock.monotime
Mctx, Mcancel := context.WithCancel(bg)
gwg.Gox(func() {
gox(gwg, func() {
err := M.Run(Mctx)
fmt.Println("M err: ", err)
exc.Raiseif(err)
......@@ -251,7 +254,7 @@ func TestMasterStorage(t *testing.T) {
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zstor)
Sctx, Scancel := context.WithCancel(bg)
gwg.Gox(func() {
gox(gwg, func() {
err := S.Run(Sctx)
fmt.Println("S err: ", err)
exc.Raiseif(err)
......@@ -301,8 +304,8 @@ func TestMasterStorage(t *testing.T) {
tc.Expect(masterStartReady(M, true))
// M <- start cmd
wg := &xsync.WorkGroup{}
wg.Gox(func() {
wg := &errgroup.Group{}
gox(wg, func() {
err := M.Start()
exc.Raiseif(err)
})
......@@ -406,8 +409,8 @@ func TestMasterStorage(t *testing.T) {
// C asks M about last tid XXX better master sends it itself on new client connected
wg = &xsync.WorkGroup{}
wg.Gox(func() {
wg = &errgroup.Group{}
gox(wg, func() {
cLastTid, err := C.LastTid(bg)
exc.Raiseif(err)
......@@ -424,11 +427,11 @@ func TestMasterStorage(t *testing.T) {
xwait(wg)
// C starts loading first object ...
wg = &xsync.WorkGroup{}
wg = &errgroup.Group{}
xid1 := zodb.Xid{Oid: 1, XTid: zodb.XTid{Tid: zodb.TidMax, TidBefore: true}}
buf1, serial1, err := zstor.Load(bg, xid1)
exc.Raiseif(err)
wg.Gox(func() {
gox(wg, func() {
buf, serial, err := C.Load(bg, xid1)
exc.Raiseif(err)
......
......@@ -26,7 +26,8 @@ import (
//"log"
)
// SeqReaderAt implements buffering for a io.ReaderAt optimized for sequential access
// SeqReaderAt implements buffering for a io.ReaderAt optimized for sequential access.
//
// Both forward, backward and interleaved forward/backward access patterns are supported
//
// NOTE SeqReaderAt is not safe to use from multiple goroutines concurrently.
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xsync provides addons to packages "sync" and "golang.org/x/sync".
package xsync
import (
"context"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
)
// WorkGroup is like x/sync/errgroup.Group but also supports exceptions
type WorkGroup struct {
errgroup.Group
}
// Gox calls the given function in a new goroutine and handles exceptions
//
// it translates exception raised, if any, to as if it was regular error
// returned for a function under Go call.
//
// see errgroup.Group.Go documentation for details on how error from spawned
// goroutines are handled group-wise.
func (g *WorkGroup) Gox(xf func()) {
g.Go(func() error {
return exc.Runx(xf)
})
}
// WorkGroupCtx returns new WorkGroup and associated context derived from ctx
// see errgroup.WithContext for semantic description and details.
func WorkGroupCtx(ctx context.Context) (*WorkGroup, context.Context) {
g, ctx := errgroup.WithContext(ctx)
return &WorkGroup{*g}, ctx
}
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package xsync
import (
"context"
"testing"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/my"
)
func TestWorkGroup(t *testing.T) {
g := WorkGroup{}
g.Gox(func() {
exc.Raise(1)
})
err := g.Wait()
e, ok := err.(*exc.Error)
want := my.FuncName() + ".func1: 1"
if !(ok && e.Error() == want) {
t.Fatalf("gox:\nhave: %v\nwant: %v", err, want)
}
g2, ctx := WorkGroupCtx(context.Background())
g2.Gox(func() {
exc.Raise(2)
})
<-ctx.Done()
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment