Commit e8396003 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/neonet: Link establishment

Implement NEO protocol handshaking and use it in newly provided DialLink
and ListenLink which correspondingly first do regular network dial or
listen and than perform the handshake on just established TCP
connection. If handshake goes ok, the result is wrapped into NodeLink.

Some history:

	lab.nexedi.com/kirr/neo/commit/8d0a1469	X Handshake draftly done

See also http://navytux.spb.ru/~kirr/neo.html#development-overview
(starting from "The neonet module also provides DialLink and ListenLink
...")
parent 64513925
...@@ -17,9 +17,11 @@ ...@@ -17,9 +17,11 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
// Package neonet provides service to exchange messages in a NEO network. // Package neonet provides service to establish links and exchange messages in
// a NEO network.
// //
// A NEO node - node // A NEO node - node link can be established with DialLink and ListenLink
// similarly to how it is done in standard package net. Once established, a
// link (NodeLink) provides service for multiplexing several communication // link (NodeLink) provides service for multiplexing several communication
// connections on top of it. Connections (Conn) in turn provide service to // connections on top of it. Connections (Conn) in turn provide service to
// exchange NEO protocol messages. // exchange NEO protocol messages.
...@@ -186,6 +188,8 @@ type ConnError struct { ...@@ -186,6 +188,8 @@ type ConnError struct {
} }
// _LinkRole is a role an end of NodeLink is intended to play. // _LinkRole is a role an end of NodeLink is intended to play.
//
// XXX _LinkRole will need to become public again if _Handshake does.
type _LinkRole int type _LinkRole int
const ( const (
_LinkServer _LinkRole = iota // link created as server _LinkServer _LinkRole = iota // link created as server
...@@ -207,6 +211,9 @@ const ( ...@@ -207,6 +211,9 @@ const (
// //
// Usually server role should be used for connections created via // Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial. // net.Listen/net.Accept and client role for connections created via net.Dial.
//
// Though it is possible to wrap just-established raw connection into NodeLink,
// users should always use Handshake which performs protocol handshaking first.
func newNodeLink(conn net.Conn, role _LinkRole) *NodeLink { func newNodeLink(conn net.Conn, role _LinkRole) *NodeLink {
var nextConnId uint32 var nextConnId uint32
switch role &^ linkFlagsMask { switch role &^ linkFlagsMask {
......
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neonet
// link establishment
import (
"context"
"encoding/binary"
"fmt"
"io"
"net"
"sync"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// ---- Handshake ----
// XXX _Handshake may be needed to become public in case when we have already
// established raw connection and want to hand-over it to NEO. But currently we
// do not have such uses.
// _Handshake performs NEO protocol handshake just after raw connection between
// 2 nodes was established.
//
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func _Handshake(ctx context.Context, conn net.Conn, role _LinkRole) (nl *NodeLink, err error) {
err = handshake(ctx, conn, proto.Version)
if err != nil {
return nil, err
}
// handshake ok -> NodeLink
return newNodeLink(conn, role), nil
}
// _HandshakeError is returned when there is an error while performing handshake.
type _HandshakeError struct {
LocalAddr net.Addr
RemoteAddr net.Addr
Err error
}
func (e *_HandshakeError) Error() string {
return fmt.Sprintf("%s - %s: handshake: %s", e.LocalAddr, e.RemoteAddr, e.Err.Error())
}
func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
// XXX simplify -> errgroup
errch := make(chan error, 2)
// tx handshake word
txWg := sync.WaitGroup{}
txWg.Add(1)
go func() {
var b [4]byte
binary.BigEndian.PutUint32(b[:], version) // XXX -> hton32 ?
_, err := conn.Write(b[:])
// XXX EOF -> ErrUnexpectedEOF ?
errch <- err
txWg.Done()
}()
// rx handshake word
go func() {
var b [4]byte
_, err := io.ReadFull(conn, b[:])
if err == io.EOF {
err = io.ErrUnexpectedEOF // can be returned with n = 0
}
if err == nil {
peerVersion := binary.BigEndian.Uint32(b[:]) // XXX -> ntoh32 ?
if peerVersion != version {
err = fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVersion, version)
}
}
errch <- err
}()
connClosed := false
defer func() {
// make sure our version is always sent on the wire, if possible,
// so that peer does not see just closed connection when on rx we see version mismatch.
//
// NOTE if cancelled tx goroutine will wake up without delay.
txWg.Wait()
// don't forget to close conn if returning with error + add handshake err context
if err != nil {
err = &_HandshakeError{conn.LocalAddr(), conn.RemoteAddr(), err}
if !connClosed {
conn.Close()
}
}
}()
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
conn.Close() // interrupt IO
connClosed = true
return ctx.Err()
case err = <-errch:
if err != nil {
return err
}
}
}
// handshaked ok
return nil
}
// ---- Dial & Listen at NodeLink level ----
// DialLink connects to address on given network, performs NEO protocol
// handshake and wraps the connection as NodeLink.
func DialLink(ctx context.Context, net xnet.Networker, addr string) (*NodeLink, error) {
peerConn, err := net.Dial(ctx, addr)
if err != nil {
return nil, err
}
return _Handshake(ctx, peerConn, _LinkClient)
}
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
//
// The listener accepts only those connections that pass NEO protocol handshake.
func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
rawl, err := net.Listen(laddr)
if err != nil {
return nil, err
}
return NewLinkListener(rawl), nil
}
// NewLinkListener creates LinkListener which accepts connections from an inner
// net.Listener and wraps them as NodeLink.
//
// The listener accepts only those connections that pass NEO protocol handshake.
func NewLinkListener(inner net.Listener) LinkListener {
l := &linkListener{
l: inner,
acceptq: make(chan linkAccepted),
closed: make(chan struct{}),
}
go l.run()
return l
}
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept.
type LinkListener interface {
// from net.Listener:
Close() error
Addr() net.Addr
// Accept returns new incoming connection wrapped into NodeLink.
// It accepts only those connections which pass NEO protocol handshake.
Accept() (*NodeLink, error)
}
// linkListener implements LinkListener.
type linkListener struct {
l net.Listener
acceptq chan linkAccepted
closed chan struct{}
}
type linkAccepted struct {
link *NodeLink
err error
}
func (l *linkListener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *linkListener) run() {
// context that cancels when listener stops
runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
conn, err := l.l.Accept()
go l.accept(runCtx, conn, err)
}
}
func (l *linkListener) accept(ctx context.Context, conn net.Conn, err error) {
link, err := l.accept1(ctx, conn, err)
select {
case l.acceptq <- linkAccepted{link, err}:
// ok
case <-l.closed:
// shutdown
if link != nil {
link.Close()
}
}
}
func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
// XXX err ctx?
if err != nil {
return nil, err
}
// NOTE Handshake closes conn in case of failure
link, err := _Handshake(ctx, conn, _LinkServer)
if err != nil {
return nil, err
}
return link, nil
}
func (l *linkListener) Accept() (*NodeLink, error) {
select {
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, err
case a := <-l.acceptq:
return a.link, a.err
}
}
func (l *linkListener) Addr() net.Addr {
return l.l.Addr()
}
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neonet
import (
"context"
"io"
"net"
"testing"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
)
func xhandshake(ctx context.Context, c net.Conn, version uint32) {
err := handshake(ctx, c, version)
exc.Raiseif(err)
}
func TestHandshake(t *testing.T) {
bg := context.Background()
// handshake ok
p1, p2 := net.Pipe()
wg := &errgroup.Group{}
gox(wg, func() {
xhandshake(bg, p1, 1)
})
gox(wg, func() {
xhandshake(bg, p2, 1)
})
xwait(wg)
xclose(p1)
xclose(p2)
// version mismatch
p1, p2 = net.Pipe()
var err1, err2 error
wg = &errgroup.Group{}
gox(wg, func() {
err1 = handshake(bg, p1, 1)
})
gox(wg, func() {
err2 = handshake(bg, p2, 2)
})
xwait(wg)
xclose(p1)
xclose(p2)
err1Want := "pipe - pipe: handshake: protocol version mismatch: peer = 00000002 ; our side = 00000001"
err2Want := "pipe - pipe: handshake: protocol version mismatch: peer = 00000001 ; our side = 00000002"
if !(err1 != nil && err1.Error() == err1Want) {
t.Errorf("handshake ver mismatch: p1: unexpected error:\nhave: %v\nwant: %v", err1, err1Want)
}
if !(err2 != nil && err2.Error() == err2Want) {
t.Errorf("handshake ver mismatch: p2: unexpected error:\nhave: %v\nwant: %v", err2, err2Want)
}
// tx & rx problem
p1, p2 = net.Pipe()
err1, err2 = nil, nil
wg = &errgroup.Group{}
gox(wg, func() {
err1 = handshake(bg, p1, 1)
})
gox(wg, func() {
xclose(p2)
})
xwait(wg)
xclose(p1)
err11, ok := err1.(*_HandshakeError)
if !ok || !(err11.Err == io.ErrClosedPipe /* on Write */ || err11.Err == io.ErrUnexpectedEOF /* on Read */) {
t.Errorf("handshake peer close: unexpected error: %#v", err1)
}
// ctx cancel
p1, p2 = net.Pipe()
ctx, cancel := context.WithCancel(bg)
gox(wg, func() {
err1 = handshake(ctx, p1, 1)
})
tdelay()
cancel()
xwait(wg)
xclose(p1)
xclose(p2)
err11, ok = err1.(*_HandshakeError)
if !ok || !(err11.Err == context.Canceled) {
t.Errorf("handshake cancel: unexpected error: %#v", err1)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment