Commit 0f7357ec authored by Kirill Smelkov's avatar Kirill Smelkov

go/*: Adapt to xnet.Networker changes wrt Listen and cancellation

See

	kirr/go123@3354b401
and

	kirr/go123@b03d65ff

The wrapping logic in LinkListener and requireIdentifyHello goes away
because Accept from xnet now supports cancellation.
parent d58a606f
......@@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"io/ioutil"
stdnet "net"
"net/url"
"os"
"os/exec"
......@@ -218,8 +217,8 @@ type NEOGoSrv struct {
cancel func() // to stop spawned nodes
serveWG *xsync.WorkGroup // services are spawned under serveWG
Ml stdnet.Listener // M listens here
Sl stdnet.Listener // S listens here
Ml xnet.Listener // M listens here
Sl xnet.Listener // S listens here
M *Master // M service
S *Storage // S service
Sback *bsqlite.Backend // S backend
......@@ -249,8 +248,8 @@ func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) {
net := xnet.NetPlain("tcp") // FIXME
n.Ml, err = net.Listen(""); if err != nil { return nil, err }
n.Sl, err = net.Listen(""); if err != nil { return nil, err }
n.Ml, err = net.Listen(ctx, ""); if err != nil { return nil, err }
n.Sl, err = net.Listen(ctx, ""); if err != nil { return nil, err }
n.M = NewMaster(opt.name, net)
......
......@@ -25,10 +25,10 @@ import (
"flag"
"fmt"
"io"
stdnet "net"
"os"
"lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo"
)
......@@ -70,7 +70,7 @@ func masterMain(argv []string) {
ctx := context.Background()
err = listenAndServe(ctx, net, *bind, func(ctx context.Context, l stdnet.Listener) error {
err = listenAndServe(ctx, net, *bind, func(ctx context.Context, l xnet.Listener) error {
master := neo.NewMaster(*cluster, net)
return master.Run(ctx, l)
})
......
......@@ -24,7 +24,6 @@ import (
"bytes"
"context"
"flag"
stdnet "net"
"net/http"
"io"
"fmt"
......@@ -103,8 +102,8 @@ func neoMatch(r io.Reader) bool {
// default HTTP mux.
//
// Default HTTP mux can be assumed to contain /debug/pprof and the like.
func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve func(ctx context.Context, l stdnet.Listener) error) error {
l, err := net.Listen(laddr)
func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve func(ctx context.Context, l xnet.Listener) error) error {
l, err := net.Listen(ctx, laddr)
if err != nil {
return err
}
......@@ -114,7 +113,7 @@ func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve
log.Infof(ctx, "listening at %s ...", l.Addr())
log.Flush() // XXX ok?
mux := cmux.New(l)
mux := cmux.New(xnet.BindCtxL(l, ctx))
neoL := mux.Match(neoMatch)
httpL := mux.Match(cmux.HTTP1(), cmux.HTTP2()) // XXX verify http2 works
miscL := mux.Match(cmux.Any())
......@@ -127,7 +126,7 @@ func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve
})
wg.Go(func(ctx context.Context) error {
return serve(ctx, neoL)
return serve(ctx, xnet.WithCtxL(neoL))
})
wg.Go(func(ctx context.Context) error {
......
......@@ -24,13 +24,13 @@ import (
"context"
"flag"
"fmt"
stdnet "net"
"io"
"os"
"runtime"
"strings"
"lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/storage"
......@@ -110,7 +110,7 @@ func storageMain(argv []string) {
prog.Fatal(err)
}
err = listenAndServe(ctx, net, *bind, func(ctx context.Context, l stdnet.Listener) error {
err = listenAndServe(ctx, net, *bind, func(ctx context.Context, l xnet.Listener) error {
stor := neo.NewStorage(*cluster, master, net, back)
return stor.Run(ctx, l)
})
......
......@@ -24,7 +24,6 @@ import (
"context"
stderrors "errors"
"fmt"
stdnet "net"
"sync"
"time"
......@@ -130,7 +129,7 @@ func (m *Master) setClusterState(state proto.ClusterState) {
// Run starts master node and runs it until ctx is cancelled or fatal error.
//
// The master will be serving incoming connections on l.
func (m *Master) Run(ctx context.Context, l stdnet.Listener) (err error) {
func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
addr := l.Addr()
defer task.Runningf(&ctx, "master(%v)", addr)(&err)
......
......@@ -173,90 +173,22 @@ type Listener interface {
// requireIdentifyHello wraps inner LinkListener into ^^^ Listener.
func requireIdentifyHello(inner neonet.LinkListener) Listener {
l := &listener{
l: inner,
acceptq: make(chan accepted),
closed: make(chan struct{}),
}
go l.run()
return l
return &listener{l: inner}
}
type listener struct {
l neonet.LinkListener
acceptq chan accepted
closed chan struct {}
}
type accepted struct {
req *neonet.Request
idReq *proto.RequestIdentification
err error
}
func (l *listener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *listener) run() {
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
// XXX do not let err go to .accept() - handle here? (but here
// we do not know with which severety and context to log)
link, err := l.l.Accept()
go l.accept(link, err)
}
}
func (l *listener) accept(link *neonet.NodeLink, err error) {
res := make(chan accepted, 1)
go func() {
req, idReq, err := l.accept1(context.Background(), link, err) // XXX ctx cancel on l close?
res <- accepted{req, idReq, err}
}()
// wait for accept1 result & resend it to .acceptq
// close link in case of listening cancel or error
//
// the only case when link stays alive is when acceptance was
// successful and link ownership is passed to Accept.
ok := false
select {
case <-l.closed:
case a := <-res:
select {
case l.acceptq <- a:
ok = (a.err == nil)
case <-l.closed:
}
}
if !ok && link != nil {
link.Close()
}
}
func (l *listener) accept1(ctx context.Context, link *neonet.NodeLink, err0 error) (_ *neonet.Request, _ *proto.RequestIdentification, err error) {
if err0 != nil {
return nil, nil, err0
func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, _ *proto.RequestIdentification, err error) {
link, err := l.l.Accept(ctx)
if err != nil {
return nil, nil, err
}
defer xerr.Context(&err, "identify") // XXX -> task.ErrContext?
// identify peer
// the first conn must come with RequestIdentification packet
defer xerr.Context(&err, "identify") // XXX -> task.ErrContext?
req, err := link.Recv1(/*ctx*/)
if err != nil {
return nil, nil, err
......@@ -272,24 +204,8 @@ func (l *listener) accept1(ctx context.Context, link *neonet.NodeLink, err0 erro
return nil, nil, emsg
}
func (l *listener) Accept(ctx context.Context) (*neonet.Request, *proto.RequestIdentification, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, nil, err
case <-ctx.Done():
return nil, nil, ctx.Err()
case a := <-l.acceptq:
return a.req, a.idReq, a.err
}
}
func (l *listener) Addr() net.Addr {
return l.l.Addr()
}
func (l *listener) Close() error { return l.l.Close() }
func (l *listener) Addr() net.Addr { return l.l.Addr() }
// ----------------------------------------
......
......@@ -147,8 +147,8 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (*NodeLink,
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
//
// The listener accepts only those connections that pass NEO protocol handshake.
func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
rawl, err := net.Listen(laddr)
func ListenLink(ctx context.Context, net xnet.Networker, laddr string) (LinkListener, error) {
rawl, err := net.Listen(ctx, laddr)
if err != nil {
return nil, err
}
......@@ -160,82 +160,30 @@ func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
// net.Listener and wraps them as NodeLink.
//
// The listener accepts only those connections that pass NEO protocol handshake.
func NewLinkListener(inner net.Listener) LinkListener {
l := &linkListener{
l: inner,
acceptq: make(chan linkAccepted),
closed: make(chan struct{}),
}
go l.run()
return l
func NewLinkListener(inner xnet.Listener) LinkListener {
return &linkListener{l: inner}
}
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept.
// LinkListener is xnet.Listener adapted to return handshaked NodeLink on Accept.
type LinkListener interface {
// from net.Listener:
// from xnet.Listener:
Close() error
Addr() net.Addr
// Accept returns new incoming connection wrapped into NodeLink.
// It accepts only those connections which pass NEO protocol handshake.
Accept() (*NodeLink, error)
Accept(ctx context.Context) (*NodeLink, error)
}
// linkListener implements LinkListener.
type linkListener struct {
l net.Listener
acceptq chan linkAccepted
closed chan struct{}
}
type linkAccepted struct {
link *NodeLink
err error
}
func (l *linkListener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *linkListener) run() {
// context that cancels when listener stops
runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
conn, err := l.l.Accept()
go l.accept(runCtx, conn, err)
}
l xnet.Listener
}
func (l *linkListener) accept(ctx context.Context, conn net.Conn, err error) {
link, err := l.accept1(ctx, conn, err)
select {
case l.acceptq <- linkAccepted{link, err}:
// ok
case <-l.closed:
// shutdown
if link != nil {
link.Close()
}
}
}
func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
func (l *linkListener) Accept(ctx context.Context) (*NodeLink, error) {
// XXX err ctx?
conn, err := l.l.Accept(ctx)
if err != nil {
return nil, err
}
......@@ -249,18 +197,5 @@ func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*
return link, nil
}
func (l *linkListener) Accept() (*NodeLink, error) {
select {
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, err
case a := <-l.acceptq:
return a.link, a.err
}
}
func (l *linkListener) Addr() net.Addr {
return l.l.Addr()
}
func (l *linkListener) Close() error { return l.l.Close() }
func (l *linkListener) Addr() net.Addr { return l.l.Addr() }
......@@ -23,7 +23,6 @@ package neo
import (
"context"
"fmt"
stdnet "net"
"sync"
"time"
......@@ -83,7 +82,7 @@ func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage
// commands it to shutdown.
//
// The storage will be serving incoming connections on l.
func (stor *Storage) Run(ctx context.Context, l stdnet.Listener) (err error) {
func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
addr := l.Addr()
defer task.Runningf(&ctx, "storage(%v)", addr)(&err)
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -219,7 +219,7 @@ func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker,
}
func (s *tStorage) Run(ctx context.Context) error {
l, err := s.node.Net.Listen(s.serveAddr)
l, err := s.node.Net.Listen(ctx, s.serveAddr)
if err != nil {
return err
}
......@@ -247,7 +247,7 @@ func tNewMaster(clusterName, serveAddr string, net xnet.Networker) *tMaster {
}
func (m *tMaster) Run(ctx context.Context) error {
l, err := m.node.Net.Listen(m.serveAddr)
l, err := m.node.Net.Listen(ctx, m.serveAddr)
if err != nil {
return err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment