Commit e5729209 authored by Kirill Smelkov's avatar Kirill Smelkov

go/*: Adapt to xnet.Networker changes wrt Listen and cancellation

See

	go123@3354b401
and

	go123@b03d65ff

The wrapping logic in LinkListener goes away because Accept from xnet
now supports cancellation.
parent 463ef9ad
...@@ -147,8 +147,8 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (*NodeLink, ...@@ -147,8 +147,8 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (*NodeLink,
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink. // ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
// //
// The listener accepts only those connections that pass NEO protocol handshake. // The listener accepts only those connections that pass NEO protocol handshake.
func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) { func ListenLink(ctx context.Context, net xnet.Networker, laddr string) (LinkListener, error) {
rawl, err := net.Listen(laddr) rawl, err := net.Listen(ctx, laddr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -160,82 +160,30 @@ func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) { ...@@ -160,82 +160,30 @@ func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
// net.Listener and wraps them as NodeLink. // net.Listener and wraps them as NodeLink.
// //
// The listener accepts only those connections that pass NEO protocol handshake. // The listener accepts only those connections that pass NEO protocol handshake.
func NewLinkListener(inner net.Listener) LinkListener { func NewLinkListener(inner xnet.Listener) LinkListener {
l := &linkListener{ return &linkListener{l: inner}
l: inner,
acceptq: make(chan linkAccepted),
closed: make(chan struct{}),
}
go l.run()
return l
} }
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept. // LinkListener is xnet.Listener adapted to return handshaked NodeLink on Accept.
type LinkListener interface { type LinkListener interface {
// from net.Listener: // from xnet.Listener:
Close() error Close() error
Addr() net.Addr Addr() net.Addr
// Accept returns new incoming connection wrapped into NodeLink. // Accept returns new incoming connection wrapped into NodeLink.
// It accepts only those connections which pass NEO protocol handshake. // It accepts only those connections which pass NEO protocol handshake.
Accept() (*NodeLink, error) Accept(ctx context.Context) (*NodeLink, error)
} }
// linkListener implements LinkListener. // linkListener implements LinkListener.
type linkListener struct { type linkListener struct {
l net.Listener l xnet.Listener
acceptq chan linkAccepted
closed chan struct{}
}
type linkAccepted struct {
link *NodeLink
err error
}
func (l *linkListener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *linkListener) run() {
// context that cancels when listener stops
runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
conn, err := l.l.Accept()
go l.accept(runCtx, conn, err)
}
} }
func (l *linkListener) accept(ctx context.Context, conn net.Conn, err error) { func (l *linkListener) Accept(ctx context.Context) (*NodeLink, error) {
link, err := l.accept1(ctx, conn, err)
select {
case l.acceptq <- linkAccepted{link, err}:
// ok
case <-l.closed:
// shutdown
if link != nil {
link.Close()
}
}
}
func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
// XXX err ctx? // XXX err ctx?
conn, err := l.l.Accept(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -249,18 +197,5 @@ func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (* ...@@ -249,18 +197,5 @@ func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*
return link, nil return link, nil
} }
func (l *linkListener) Accept() (*NodeLink, error) { func (l *linkListener) Close() error { return l.l.Close() }
select { func (l *linkListener) Addr() net.Addr { return l.l.Addr() }
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, err
case a := <-l.acceptq:
return a.link, a.err
}
}
func (l *linkListener) Addr() net.Addr {
return l.l.Addr()
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment