Commit e407f725 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/neonet: Rework handshake to differentiate client and server parts

Previously we were doing handshake symmetrically: both client and server
were transmitting hello and receiving peer's hello simultaneously.
However this does not allow server to adjust its behaviour depending on
which client (protocol version, protocol encoding, ...) is connecting to it.

-> Rework handshake so that client always sends its hello first, and
only then the server side replies. This matches actual NEO/py behaviour:

https://lab.nexedi.com/nexedi/neoppod/blob/v1.12-67-g261dd4b4/neo/lib/connector.py#L293-294

even though the "NEO protocol" states that

	Handshake transmissions are not ordered with respect to each other and can go in parallel.

	( https://neo.nexedi.com/P-NEO-Protocol.Specification.2019?portal_skin=CI_slideshow#/9/2 )

If I recall correctly that sentence was authored by me in 2018 based on
previous understanding of should-be full symmetry in-between client and
server.

However soon we are going to teach server sides to autodetect client
encoding and adjust server to talk to client via its preferred way.
This needs handshake for client and server to be differentiated.

The protocol needs to be adjusted as well. However I'm not sure it is
going to happen...
parent d6f35f70
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xcontext provides addons to std package context.
package xcontext
import (
"context"
"io"
)
// WithCloseOnErrCancel closes c on ctx cancel while f is run, or if f returns with an error.
//
// It is usually handy to propagate cancellation to interrupt IO.
func WithCloseOnErrCancel(ctx context.Context, c io.Closer, f func() error) (err error) {
closed := false
fdone := make(chan error)
defer func() {
<-fdone // wait for f to complete
if err != nil {
if !closed {
c.Close()
}
}
}()
go func() (err error) {
defer func() {
fdone <- err
close(fdone)
}()
return f()
}()
select {
case <-ctx.Done():
c.Close() // interrupt IO
closed = true
return ctx.Err()
case err := <-fdone:
return err
}
}
...@@ -234,8 +234,6 @@ type ConnError struct { ...@@ -234,8 +234,6 @@ type ConnError struct {
} }
// _LinkRole is a role an end of NodeLink is intended to play. // _LinkRole is a role an end of NodeLink is intended to play.
//
// XXX _LinkRole will need to become public again if _Handshake does.
type _LinkRole int type _LinkRole int
const ( const (
_LinkServer _LinkRole = iota // link created as server _LinkServer _LinkRole = iota // link created as server
......
// Copyright (C) 2016-2020 Nexedi SA and Contributors. // Copyright (C) 2016-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -1084,12 +1084,12 @@ func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) { ...@@ -1084,12 +1084,12 @@ func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) {
wg := xsync.NewWorkGroup(context.Background()) wg := xsync.NewWorkGroup(context.Background())
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
l, err := _Handshake(ctx, c1, _LinkClient) l, err := _HandshakeClient(ctx, c1)
exc.Raiseif(err) exc.Raiseif(err)
l1 = l l1 = l
}) })
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
l, err := _Handshake(ctx, c2, _LinkServer) l, err := _HandshakeServer(ctx, c2)
exc.Raiseif(err) exc.Raiseif(err)
l2 = l l2 = l
}) })
......
...@@ -26,113 +26,181 @@ import ( ...@@ -26,113 +26,181 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xio" "lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
) )
// ---- Handshake ---- // ---- Handshake ----
// XXX _Handshake may be needed to become public in case when we have already // XXX _Handshake{Client,Server} may be needed to become public in case when we have already
// established raw connection and want to hand-over it to NEO. But currently we // established raw connection and want to hand-over it to NEO. But currently we
// do not have such uses. // do not have such uses.
// _Handshake performs NEO protocol handshake just after raw connection between func _HandshakeClient(ctx context.Context, conn net.Conn) (*NodeLink, error) {
// 2 nodes was established. return handshakeClient(ctx, conn, proto.Version)
// }
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func _Handshake(ctx context.Context, conn net.Conn, role _LinkRole) (nl *NodeLink, err error) {
err = handshake(ctx, conn, proto.Version)
if err != nil {
return nil, err
}
// handshake ok -> NodeLink func _HandshakeServer(ctx context.Context, conn net.Conn) (*NodeLink, error) {
return newNodeLink(conn, role), nil return handshakeServer(ctx, conn, proto.Version)
} }
// _HandshakeError is returned when there is an error while performing handshake. // _HandshakeError is returned when there is an error while performing handshake.
type _HandshakeError struct { type _HandshakeError struct {
LocalRole _LinkRole
LocalAddr net.Addr LocalAddr net.Addr
RemoteAddr net.Addr RemoteAddr net.Addr
Err error Err error
} }
func (e *_HandshakeError) Error() string { func (e *_HandshakeError) Error() string {
return fmt.Sprintf("%s - %s: handshake: %s", e.LocalAddr, e.RemoteAddr, e.Err.Error()) role := ""
switch e.LocalRole {
case _LinkServer: role = "server"
case _LinkClient: role = "client"
default: panic("bug")
}
return fmt.Sprintf("%s - %s: handshake (%s): %s", e.LocalAddr, e.RemoteAddr, role, e.Err.Error())
} }
func (e *_HandshakeError) Cause() error { return e.Err } func (e *_HandshakeError) Cause() error { return e.Err }
func (e *_HandshakeError) Unwrap() error { return e.Err } func (e *_HandshakeError) Unwrap() error { return e.Err }
func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) { // handshakeClient implements client-side NEO protocol handshake just after raw
// XXX simplify -> errgroup // connection between 2 nodes was established.
errch := make(chan error, 2) //
// Client indicates its version to server.
//
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func handshakeClient(ctx context.Context, conn net.Conn, version uint32) (*NodeLink, error) {
err := _handshakeClient(ctx, conn, version)
if err != nil {
return nil, err
}
return newNodeLink(conn, _LinkClient), nil
}
// tx handshake word // handshakeServer implements server-side NEO protocol handshake just after raw
txWg := sync.WaitGroup{} // connection between 2 nodes was established.
txWg.Add(1) //
go func() { // Server verifies that its version matches Client.
var b [4]byte //
binary.BigEndian.PutUint32(b[:], version) // XXX -> hton32 ? // On success raw connection is returned wrapped into NodeLink.
_, err := conn.Write(b[:]) // On error raw connection is closed.
// XXX EOF -> ErrUnexpectedEOF ? func handshakeServer(ctx context.Context, conn net.Conn, version uint32) (*NodeLink, error) {
errch <- err err := _handshakeServer(ctx, conn, version)
txWg.Done() if err != nil {
return nil, err
}
return newNodeLink(conn, _LinkServer), nil
}
func _handshakeClient(ctx context.Context, conn net.Conn, version uint32) (err error) {
defer func() {
if err != nil {
err = &_HandshakeError{_LinkClient, conn.LocalAddr(), conn.RemoteAddr(), err}
}
}() }()
// rx handshake word err = xcontext.WithCloseOnErrCancel(ctx, conn, func() error {
go func() { // tx client hello
var b [4]byte err := txHello("tx hello", conn, version)
_, err := io.ReadFull(conn, b[:]) if err != nil {
err = xio.NoEOF(err) // can be returned with n = 0 return err
if err == nil {
peerVersion := binary.BigEndian.Uint32(b[:]) // XXX -> ntoh32 ?
if peerVersion != version {
err = fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVersion, version)
} }
// rx server hello reply
var peerVer uint32
peerVer, err = rxHello("rx hello reply", conn)
if err != nil {
return err
} }
errch <- err
}()
connClosed := false // verify version
defer func() { if peerVer != version {
// make sure our version is always sent on the wire, if possible, return fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVer, version)
// so that peer does not see just closed connection when on rx we see version mismatch. }
//
// NOTE if cancelled tx goroutine will wake up without delay.
txWg.Wait()
// don't forget to close conn if returning with error + add handshake err context return nil
})
if err != nil { if err != nil {
err = &_HandshakeError{conn.LocalAddr(), conn.RemoteAddr(), err} return err
if !connClosed {
conn.Close()
} }
return nil
}
func _handshakeServer(ctx context.Context, conn net.Conn, version uint32) (err error) {
defer func() {
if err != nil {
err = &_HandshakeError{_LinkServer, conn.LocalAddr(), conn.RemoteAddr(), err}
} }
}() }()
for i := 0; i < 2; i++ { err = xcontext.WithCloseOnErrCancel(ctx, conn, func() error {
select { // rx client hello
case <-ctx.Done(): var peerVer uint32
conn.Close() // interrupt IO var err error
connClosed = true peerVer, err = rxHello("rx hello", conn)
return ctx.Err() if err != nil {
return err
}
case err = <-errch: // tx server reply
//
// do it before version check so that client can also detect "version
// mismatch" instead of just getting "disconnect".
err = txHello("tx hello reply", conn, version)
if err != nil { if err != nil {
return err return err
} }
// verify version
if peerVer != version {
return fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVer, version)
} }
return nil
})
if err != nil {
return err
} }
// handshaked ok
return nil return nil
} }
func txHello(errctx string, conn net.Conn, version uint32) (err error) {
defer xerr.Context(&err, errctx)
var b [4]byte
binary.BigEndian.PutUint32(b[:], version) // XXX -> hton32 ?
_, err = conn.Write(b[:])
if err != nil {
return err
}
return nil
}
func rxHello(errctx string, conn net.Conn) (version uint32, err error) {
defer xerr.Context(&err, errctx)
var b [4]byte
_, err = io.ReadFull(conn, b[:])
err = xio.NoEOF(err)
if err != nil {
return 0, err
}
peerVer := binary.BigEndian.Uint32(b[:]) // XXX -> ntoh32 ?
return peerVer, nil
}
// ---- Dial & Listen at NodeLink level ---- // ---- Dial & Listen at NodeLink level ----
...@@ -144,7 +212,7 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (*NodeLink, ...@@ -144,7 +212,7 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (*NodeLink,
return nil, err return nil, err
} }
return _Handshake(ctx, peerConn, _LinkClient) return _HandshakeClient(ctx, peerConn)
} }
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink. // ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
...@@ -192,7 +260,7 @@ func (l *linkListener) Accept(ctx context.Context) (*NodeLink, error) { ...@@ -192,7 +260,7 @@ func (l *linkListener) Accept(ctx context.Context) (*NodeLink, error) {
} }
// NOTE Handshake closes conn in case of failure // NOTE Handshake closes conn in case of failure
link, err := _Handshake(ctx, conn, _LinkServer) link, err := _HandshakeServer(ctx, conn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
// Copyright (C) 2016-2020 Nexedi SA and Contributors. // Copyright (C) 2016-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -21,6 +21,7 @@ package neonet ...@@ -21,6 +21,7 @@ package neonet
import ( import (
"context" "context"
"errors"
"io" "io"
"net" "net"
"testing" "testing"
...@@ -29,8 +30,15 @@ import ( ...@@ -29,8 +30,15 @@ import (
"lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/go123/xsync"
) )
func xhandshake(ctx context.Context, c net.Conn, version uint32) { // _xhandshakeClient handshakes as client.
err := handshake(ctx, c, version) func _xhandshakeClient(ctx context.Context, c net.Conn, version uint32) {
err := _handshakeClient(ctx, c, version)
exc.Raiseif(err)
}
// _xhandshakeServer handshakes as server.
func _xhandshakeServer(ctx context.Context, c net.Conn, version uint32) {
err := _handshakeServer(ctx, c, version)
exc.Raiseif(err) exc.Raiseif(err)
} }
...@@ -40,10 +48,10 @@ func TestHandshake(t *testing.T) { ...@@ -40,10 +48,10 @@ func TestHandshake(t *testing.T) {
p1, p2 := net.Pipe() p1, p2 := net.Pipe()
wg := xsync.NewWorkGroup(bg) wg := xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
xhandshake(ctx, p1, 1) _xhandshakeClient(ctx, p1, 1)
}) })
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
xhandshake(ctx, p2, 1) _xhandshakeServer(ctx, p2, 1)
}) })
xwait(wg) xwait(wg)
xclose(p1) xclose(p1)
...@@ -54,17 +62,17 @@ func TestHandshake(t *testing.T) { ...@@ -54,17 +62,17 @@ func TestHandshake(t *testing.T) {
var err1, err2 error var err1, err2 error
wg = xsync.NewWorkGroup(bg) wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
err1 = handshake(ctx, p1, 1) err1 = _handshakeClient(ctx, p1, 1)
}) })
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
err2 = handshake(ctx, p2, 2) err2 = _handshakeServer(ctx, p2, 2)
}) })
xwait(wg) xwait(wg)
xclose(p1) xclose(p1)
xclose(p2) xclose(p2)
err1Want := "pipe - pipe: handshake: protocol version mismatch: peer = 00000002 ; our side = 00000001" err1Want := "pipe - pipe: handshake (client): protocol version mismatch: peer = 00000002 ; our side = 00000001"
err2Want := "pipe - pipe: handshake: protocol version mismatch: peer = 00000001 ; our side = 00000002" err2Want := "pipe - pipe: handshake (server): protocol version mismatch: peer = 00000001 ; our side = 00000002"
if !(err1 != nil && err1.Error() == err1Want) { if !(err1 != nil && err1.Error() == err1Want) {
t.Errorf("handshake ver mismatch: p1: unexpected error:\nhave: %v\nwant: %v", err1, err1Want) t.Errorf("handshake ver mismatch: p1: unexpected error:\nhave: %v\nwant: %v", err1, err1Want)
...@@ -73,12 +81,12 @@ func TestHandshake(t *testing.T) { ...@@ -73,12 +81,12 @@ func TestHandshake(t *testing.T) {
t.Errorf("handshake ver mismatch: p2: unexpected error:\nhave: %v\nwant: %v", err2, err2Want) t.Errorf("handshake ver mismatch: p2: unexpected error:\nhave: %v\nwant: %v", err2, err2Want)
} }
// tx & rx problem // tx & rx problem (client)
p1, p2 = net.Pipe() p1, p2 = net.Pipe()
err1, err2 = nil, nil var err error
wg = xsync.NewWorkGroup(bg) wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
err1 = handshake(ctx, p1, 1) err = _handshakeClient(ctx, p1, 1)
}) })
gox(wg, func(_ context.Context) { gox(wg, func(_ context.Context) {
xclose(p2) xclose(p2)
...@@ -86,18 +94,34 @@ func TestHandshake(t *testing.T) { ...@@ -86,18 +94,34 @@ func TestHandshake(t *testing.T) {
xwait(wg) xwait(wg)
xclose(p1) xclose(p1)
err11, ok := err1.(*_HandshakeError) err_, ok := err.(*_HandshakeError)
if !ok || !(errors.Is(err_.Err, io.ErrClosedPipe /* on Write */) || errors.Is(err_.Err, io.ErrUnexpectedEOF /* on Read */)) {
t.Errorf("handshake peer close: unexpected error: %#v", err)
}
// tx & rx problem (server)
p1, p2 = net.Pipe()
wg = xsync.NewWorkGroup(bg)
gox(wg, func(_ context.Context) {
xclose(p1)
})
gox(wg, func(ctx context.Context) {
err = _handshakeServer(ctx, p2, 1)
})
xwait(wg)
xclose(p2)
if !ok || !(err11.Err == io.ErrClosedPipe /* on Write */ || err11.Err == io.ErrUnexpectedEOF /* on Read */) { err_, ok = err.(*_HandshakeError)
t.Errorf("handshake peer close: unexpected error: %#v", err1) if !ok || !(errors.Is(err_.Err, io.ErrClosedPipe /* on Write */) || errors.Is(err_.Err, io.ErrUnexpectedEOF /* on Read */)) {
t.Errorf("handshake peer close: unexpected error: %#v", err)
} }
// ctx cancel // ctx cancel (client)
p1, p2 = net.Pipe() p1, p2 = net.Pipe()
ctx, cancel := context.WithCancel(bg) ctx, cancel := context.WithCancel(bg)
wg = xsync.NewWorkGroup(ctx) wg = xsync.NewWorkGroup(ctx)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
err1 = handshake(ctx, p1, 1) err = _handshakeClient(ctx, p1, 1)
}) })
tdelay() tdelay()
cancel() cancel()
...@@ -105,10 +129,26 @@ func TestHandshake(t *testing.T) { ...@@ -105,10 +129,26 @@ func TestHandshake(t *testing.T) {
xclose(p1) xclose(p1)
xclose(p2) xclose(p2)
err11, ok = err1.(*_HandshakeError) err_, ok = err.(*_HandshakeError)
if !ok || !(err_.Err == context.Canceled) {
if !ok || !(err11.Err == context.Canceled) { t.Errorf("handshake (client): cancel: unexpected error: %#v", err)
t.Errorf("handshake cancel: unexpected error: %#v", err1)
} }
// ctx cancel (server)
p1, p2 = net.Pipe()
ctx, cancel = context.WithCancel(bg)
wg = xsync.NewWorkGroup(ctx)
gox(wg, func(ctx context.Context) {
err = _handshakeServer(ctx, p2, 1)
})
tdelay()
cancel()
xwait(wg)
xclose(p1)
xclose(p2)
err_, ok = err.(*_HandshakeError)
if !ok || !(err_.Err == context.Canceled) {
t.Errorf("handshake (server): cancel: unexpected error: %#v", err)
}
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment