Commit 71865607 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 69d5916b
......@@ -682,7 +682,7 @@ func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
}
// ---- for convenience: Dial ----
// ---- for convenience: Dial & Listen ----
// Dial connects to address on given network, handshakes and wraps the connection as NodeLink
func Dial(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, err error) {
......@@ -694,10 +694,93 @@ func Dial(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, e
return Handshake(ctx, peerConn, LinkClient)
}
// NOTE there is no Listen with Handshake hooked into Accept because: Handshake
// is blocking operation and thus needs to be run in separate goroutine not to
// block further Accepts.
// Listen starts listening on laddr for incoming connections and wraps them as NodeLink.
// The listener accepts only those connections that pass handshake.
func Listen(net xnet.Networker, laddr string) (*Listener, error) {
rawl, err := net.Listen(laddr)
if err != nil {
return nil, err
}
l := &Listener{
Listener: rawl,
acceptq: make(chan accepted),
closed: make(chan struct{}),
}
go l.run()
return l, nil
}
type Listener struct {
net.Listener
acceptq chan accepted
closed chan struct {}
}
type accepted struct {
link *NodeLink
err error
}
func (l *Listener) Close() error {
err := l.Listener.Close()
close(l.closed)
return err
}
func (l *Listener) run() {
// context that cancels when listener stops
runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept
conn, err := l.Listener.Accept()
go l.accept(runCtx, conn, err)
}
}
func (l *Listener) accept(ctx context.Context, conn net.Conn, err error) {
link, err := l.accept1(ctx, conn, err)
select {
case <-l.closed:
case l.acceptq <- accepted{link, err}:
}
}
func (l *Listener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
if err != nil {
return nil, err
}
link, err := Handshake(ctx, conn, LinkServer)
if err != nil {
return nil, err
}
return link, nil
}
func (l *Listener) Accept() (*NodeLink, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.Listener.Accept()
return nil, err
case a := <-l.acceptq:
return a.link, a.err
}
}
// ---- for convenience: Conn -> NodeLink & local/remote link addresses ----
......
......@@ -28,8 +28,6 @@ package neo
//go:generate sh -c "go run ../xcommon/tracing/cmd/gotrace/{gotrace,util}.go ."
import (
"net"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/zodb"
)
......@@ -60,9 +58,9 @@ type NodeCommon struct {
// Listen starts listening at node's listening address.
// If the address is empty one new free is automatically selected.
// The node information about where it listens at is appropriately updated.
func (n *NodeCommon) Listen() (net.Listener, error) {
func (n *NodeCommon) Listen() (*Listener, error) {
// start listening
l, err := n.Net.Listen(n.MyInfo.Address.String()) // XXX ugly
l, err := Listen(n.Net, n.MyInfo.Address.String()) // XXX ugly
if err != nil {
return nil, err // XXX err ctx
}
......
......@@ -25,7 +25,6 @@ package server
import (
"context"
"fmt"
"net"
"lab.nexedi.com/kirr/neo/go/neo"
......@@ -41,10 +40,10 @@ type Server interface {
// Serve runs service on a listener
// - accept incoming connection on the listener
// - for every accepted connection spawn handshake + srv.ServeLink() in separate goroutine.
// - for every accepted connection spawn srv.ServeLink() in separate goroutine.
//
// the listener is closed when Serve returns.
func Serve(ctx context.Context, l net.Listener, srv Server) error {
func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
fmt.Printf("xxx: serving on %s ...\n", l.Addr()) // XXX 'xxx' -> ?
// close listener when either cancelling or returning (e.g. due to an error)
......@@ -62,46 +61,29 @@ func Serve(ctx context.Context, l net.Listener, srv Server) error {
l.Close() // XXX err
}()
// main Accept -> Handshake -> ServeLink loop
// main Accept -> ServeLink loop
for {
peerConn, err := l.Accept()
link, err := l.Accept()
if err != nil {
// TODO err == closed <-> ctx was cancelled
// TODO err -> net.Error && .Temporary() -> some throttling
return err
}
go func() {
link, err := neo.Handshake(ctx, peerConn, neo.LinkServer)
if err != nil {
fmt.Printf("xxx: %s\n", err)
return
}
srv.ServeLink(ctx, link)
}()
}
}
/*
// ListenAndServe listens on network address and then calls Serve to handle incoming connections
// XXX unused -> goes away ?
func ListenAndServe(ctx context.Context, net neo.Network, laddr string, srv Server) error {
l, err := net.Listen(laddr)
if err != nil {
return err
go srv.ServeLink(ctx, link)
}
// TODO set keepalive on l
return Serve(ctx, l, srv)
}
*/
// ----------------------------------------
// XXX goes away? (we need a func to make sure to recv RequestIdentification
// XXX and pass it to server main logic - whether to accept it or not should be
// XXX programmed there)
//
// IdentifyPeer identifies peer on the link
// it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes.
// returns information about identified node or error.
// XXX recheck identification logic here
func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.RequestIdentification, err error) {
defer xerr.Contextf(&err, "%s: identify", link)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment