Commit 7a0674c2 authored by Levin Zimmermann's avatar Levin Zimmermann Committed by Kirill Smelkov

Fix flaky `client_test.go/TestLoad`

/reviewed-by @kirr
/reviewed-on kirr/neo!5

* t-fix-flaky-testload:
  fixup! neonet/newlink: Fix lost conn in encoding detector
  go/neo/neonet: Fix client handshake not to accept server encoding if it is different from what client indicated
  go/neo/neonet: Demonstrate problem in handshake with NEO/py
  go/neo/neonet: Dedicate an error type to indicate "protocol version mismatch" as handshake failure cause
  fixup! client_test: Keep NEO srv logs if test fails
  fixup! client_test/NEOSrv += LogContent for better debug
  neonet/newlink: Fix lost conn in encoding detector
  client_test: Keep NEO srv logs if test fails
  client_test += print NEO server log if >=1 test(s) failed
  client_test/NEOSrv += LogContent for better debug
parents 4c9414ea 917bacd2
// Copyright (C) 2020-2021 Nexedi SA and Contributors. // Copyright (C) 2020-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"net/url" "net/url"
"os" "os"
"os/exec" "os/exec"
"strings"
"testing" "testing"
"time" "time"
...@@ -54,6 +55,8 @@ type NEOSrv interface { ...@@ -54,6 +55,8 @@ type NEOSrv interface {
// BugEncFixed reports whether server, instead of autodetection, // BugEncFixed reports whether server, instead of autodetection,
// supports only fixed encoding, and, if yes, which one. // supports only fixed encoding, and, if yes, which one.
BugEncFixed() (bool, proto.Encoding) BugEncFixed() (bool, proto.Encoding)
LogTail() (string, error) // tail of log files
} }
// NEOSrvOptions represents options for a NEO server. // NEOSrvOptions represents options for a NEO server.
...@@ -90,6 +93,7 @@ func (_ *NEOPySrv) BugEncFixed() (bool, proto.Encoding) { ...@@ -90,6 +93,7 @@ func (_ *NEOPySrv) BugEncFixed() (bool, proto.Encoding) {
return true, 'N' // NEO/py 1.12 return true, 'N' // NEO/py 1.12
} }
// StartNEOPySrv starts NEO/py server specified by options. // StartNEOPySrv starts NEO/py server specified by options.
func StartNEOPySrv(opt NEOSrvOptions) (_ *NEOPySrv, err error) { func StartNEOPySrv(opt NEOSrvOptions) (_ *NEOPySrv, err error) {
workdir := opt.workdir workdir := opt.workdir
...@@ -174,6 +178,38 @@ func (n *NEOPySrv) URL() string { ...@@ -174,6 +178,38 @@ func (n *NEOPySrv) URL() string {
return fmt.Sprintf("%s%s/%s", n.opt.URLPrefix(), n.masterAddr, n.clusterName()) return fmt.Sprintf("%s%s/%s", n.opt.URLPrefix(), n.masterAddr, n.clusterName())
} }
func (n *NEOPySrv) LogTail() (string, error) {
stail, err := n.logFileTail("storage_0.log")
if err != nil {
return "", err
}
mtail, err := n.logFileTail("master_0.log")
if err != nil {
return "", err
}
// FIXME _ admin_0.log and generally automatically detect all log files
return stail + "\n\n" + mtail, nil
}
func (n *NEOPySrv) logFileTail(logfilename string) (string, error) {
c := xexec.Command("python", "-m", "neo.scripts.neolog", fmt.Sprintf("%s/%s", n.opt.workdir, logfilename))
o, err := c.Output()
if err != nil {
return "", err
}
return fmt.Sprintf("log file '%s' tail:\n\n", logfilename) + tail(string(o)), nil
}
func tail(s string) (string) {
sslice := strings.Split(s, "\n")
icount := len(sslice)
tailsize := 20
if icount < tailsize {
tailsize = icount
}
return strings.Join(sslice[icount - tailsize:], "\n")
}
func (n *NEOPySrv) Close() (err error) { func (n *NEOPySrv) Close() (err error) {
defer xerr.Contextf(&err, "stop neo/py %s", n.opt.workdir) defer xerr.Contextf(&err, "stop neo/py %s", n.opt.workdir)
...@@ -281,6 +317,10 @@ func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) { ...@@ -281,6 +317,10 @@ func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) {
return n, nil return n, nil
} }
func (n *NEOGoSrv) LogTail() (string, error) {
return "", nil // FIXME return data from glog
}
func (n *NEOGoSrv) Close() (err error) { func (n *NEOGoSrv) Close() (err error) {
defer xerr.Contextf(&err, "stop neo/go %s", n.opt.workdir) defer xerr.Contextf(&err, "stop neo/go %s", n.opt.workdir)
...@@ -384,7 +424,13 @@ func withNEOSrv(t *testing.T, f func(t *testing.T, nsrv NEOSrv), optv ...tOption ...@@ -384,7 +424,13 @@ func withNEOSrv(t *testing.T, f func(t *testing.T, nsrv NEOSrv), optv ...tOption
t.Helper() t.Helper()
X := xtesting.FatalIf(t) X := xtesting.FatalIf(t)
work, err := ioutil.TempDir("", "neo"); X(err) work, err := ioutil.TempDir("", "neo"); X(err)
defer os.RemoveAll(work) defer func() {
if t.Failed() {
t.Logf("leaving NEO database and server log files in %s", work)
} else {
os.RemoveAll(work)
}
}()
f(work) f(work)
} }
...@@ -533,6 +579,17 @@ func withNEO(t *testing.T, f func(t *testing.T, nsrv NEOSrv, ndrv *Client), optv ...@@ -533,6 +579,17 @@ func withNEO(t *testing.T, f func(t *testing.T, nsrv NEOSrv, ndrv *Client), optv
} }
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
defer func() {
if t.Failed() {
logTail, err := nsrv.LogTail()
if err != nil {
t.Logf("LogTail failed: %s", err)
} else {
t.Logf("NEO log tail:\n\n%s", logTail)
}
}
}()
X := xtesting.FatalIf(t) X := xtesting.FatalIf(t)
if noautodetect && len(encv) > 1 && encv[0] == srvEnc { if noautodetect && len(encv) > 1 && encv[0] == srvEnc {
......
// Copyright (C) 2016-2021 Nexedi SA and Contributors. // Copyright (C) 2016-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -59,6 +59,19 @@ type _HandshakeError struct { ...@@ -59,6 +59,19 @@ type _HandshakeError struct {
Err error Err error
} }
// _VersionMismatchError is reported as cause when handshake detects that peer
// uses protocol version different than ours.
type _VersionMismatchError struct {
LocalVer uint32
RemoteVer uint32
}
// _EncodingMismatchError is reported as cause when handshake detects that peer does not accept our encoding.
type _EncodingMismatchError struct {
LocalEnc proto.Encoding
RemoteEnc proto.Encoding
}
func (e *_HandshakeError) Error() string { func (e *_HandshakeError) Error() string {
role := "" role := ""
switch e.LocalRole { switch e.LocalRole {
...@@ -72,26 +85,32 @@ func (e *_HandshakeError) Error() string { ...@@ -72,26 +85,32 @@ func (e *_HandshakeError) Error() string {
func (e *_HandshakeError) Cause() error { return e.Err } func (e *_HandshakeError) Cause() error { return e.Err }
func (e *_HandshakeError) Unwrap() error { return e.Err } func (e *_HandshakeError) Unwrap() error { return e.Err }
func (e *_VersionMismatchError) Error() string {
return fmt.Sprintf("protocol version mismatch: peer = %08x ; our side = %08x", e.RemoteVer, e.LocalVer)
}
func (e *_EncodingMismatchError) Error() string {
return fmt.Sprintf("protocol encoding mismatch: peer = %q ; our side = %q", e.RemoteEnc, e.LocalEnc)
}
// handshakeClient implements client-side NEO protocol handshake just after raw // handshakeClient implements client-side NEO protocol handshake just after raw
// connection between 2 nodes was established. // connection between 2 nodes was established.
// //
// Client indicates its version and preferred encoding, but accepts any // Client indicates its version and encoding.
// encoding chosen to use by server.
// //
// On success raw connection is returned wrapped into NodeLink. // On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed. // On error raw connection is closed.
func handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPrefer proto.Encoding) (*NodeLink, error) { func handshakeClient(ctx context.Context, conn net.Conn, version uint32, encoding proto.Encoding) (*NodeLink, error) {
enc, rxbuf, err := _handshakeClient(ctx, conn, version, encPrefer) rxbuf, err := _handshakeClient(ctx, conn, version, encoding)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newNodeLink(conn, enc, _LinkClient, rxbuf), nil return newNodeLink(conn, encoding, _LinkClient, rxbuf), nil
} }
// handshakeServer implements server-side NEO protocol handshake just after raw // handshakeServer implements server-side NEO protocol handshake just after raw
// connection between 2 nodes was established. // connection between 2 nodes was established.
// //
// Server verifies that its version matches Client and accepts client preferred encoding. // Server verifies that its version matches Client and accepts client encoding.
// //
// On success raw connection is returned wrapped into NodeLink. // On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed. // On error raw connection is closed.
...@@ -103,7 +122,7 @@ func handshakeServer(ctx context.Context, conn net.Conn, version uint32) (*NodeL ...@@ -103,7 +122,7 @@ func handshakeServer(ctx context.Context, conn net.Conn, version uint32) (*NodeL
return newNodeLink(conn, enc, _LinkServer, rxbuf), nil return newNodeLink(conn, enc, _LinkServer, rxbuf), nil
} }
func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPrefer proto.Encoding) (enc proto.Encoding, rxbuf *xbufReader, err error) { func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encoding proto.Encoding) (rxbuf *xbufReader, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
err = &_HandshakeError{_LinkClient, conn.LocalAddr(), conn.RemoteAddr(), err} err = &_HandshakeError{_LinkClient, conn.LocalAddr(), conn.RemoteAddr(), err}
...@@ -115,7 +134,7 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre ...@@ -115,7 +134,7 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre
var peerEnc proto.Encoding var peerEnc proto.Encoding
err = xio.WithCloseOnErrCancel(ctx, conn, func() error { err = xio.WithCloseOnErrCancel(ctx, conn, func() error {
// tx client hello // tx client hello
err := txHello("tx hello", conn, version, encPrefer) err := txHello("tx hello", conn, version, encoding)
if err != nil { if err != nil {
return err return err
} }
...@@ -129,7 +148,12 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre ...@@ -129,7 +148,12 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre
// verify version // verify version
if peerVer != version { if peerVer != version {
return fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVer, version) return &_VersionMismatchError{version, peerVer}
}
// verify encoding
if peerEnc != encoding {
return &_EncodingMismatchError{encoding, peerEnc}
} }
return nil return nil
...@@ -138,12 +162,10 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre ...@@ -138,12 +162,10 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre
if ctx.Err() != nil { if ctx.Err() != nil {
err = ctx.Err() // error was due to ctx cancel err = ctx.Err() // error was due to ctx cancel
} }
return 0, nil, err return nil, err
} }
// use peer encoding (server should return the same, but we are ok if return rxbuf, nil
// it asks to switch to different)
return peerEnc, rxbuf, nil
} }
func _handshakeServer(ctx context.Context, conn net.Conn, version uint32) (enc proto.Encoding, rxbuf *xbufReader, err error) { func _handshakeServer(ctx context.Context, conn net.Conn, version uint32) (enc proto.Encoding, rxbuf *xbufReader, err error) {
...@@ -176,7 +198,7 @@ func _handshakeServer(ctx context.Context, conn net.Conn, version uint32) (enc p ...@@ -176,7 +198,7 @@ func _handshakeServer(ctx context.Context, conn net.Conn, version uint32) (enc p
// verify version // verify version
if peerVer != version { if peerVer != version {
return fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVer, version) return &_VersionMismatchError{version, peerVer}
} }
return nil return nil
...@@ -306,10 +328,23 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (link *NodeL ...@@ -306,10 +328,23 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (link *NodeL
link, err = handshakeClient(ctx, peerConn, proto.Version, enc) link, err = handshakeClient(ctx, peerConn, proto.Version, enc)
// NEO/py closes connection if it sees unexpected magic, version, etc. // If the peers encoding is different than our encoding two different
// -> in such case retry with next encoding trying to autodetect and match server. // scenarios can happen, because the handshake order is undefined (e.g.
// we don't know if our handshake is received before the peer sends its
// handshake):
//
// 1. Our handshake is received before peer sends its handshake, NEO/py
// closes connection if it sees unexpected magic, version, etc.
//
// 2. The client already sends a handshake before it proceeds our handshake.
// In this case it initally sends us it version, we can extract its encoding,
// and only later, once it proceeded our handshake with the bad encoding,
// closes the connection.
//
// -> in both cases retry with next encoding trying to autodetect and match server.
// -> stop trying on success, or on any other error. // -> stop trying on success, or on any other error.
if err == nil || !errors.Is(err, io.ErrUnexpectedEOF) { var errEnc *_EncodingMismatchError
if err == nil || !(errors.Is(err, io.ErrUnexpectedEOF) || errors.As(err, &errEnc)) {
break break
} }
} }
......
// Copyright (C) 2016-2021 Nexedi SA and Contributors. // Copyright (C) 2016-2023 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"errors" "errors"
"io" "io"
"net" "net"
"reflect"
"testing" "testing"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
...@@ -31,13 +32,10 @@ import ( ...@@ -31,13 +32,10 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
) )
// _xhandshakeClient handshakes as client with encPrefer encoding and verifies that server accepts it. // _xhandshakeClient handshakes as client with specified version and encoding and verifies that server accepts it.
func _xhandshakeClient(ctx context.Context, c net.Conn, version uint32, encPrefer proto.Encoding) { func _xhandshakeClient(ctx context.Context, c net.Conn, version uint32, encoding proto.Encoding) {
enc, _, err := _handshakeClient(ctx, c, version, encPrefer) _, err := _handshakeClient(ctx, c, version, encoding)
exc.Raiseif(err) exc.Raiseif(err)
if enc != encPrefer {
exc.Raisef("enc (%c) != encPrefer (%c)", enc, encPrefer)
}
} }
// _xhandshakeServer handshakes as server and verifies negotiated encoding to be encOK. // _xhandshakeServer handshakes as server and verifies negotiated encoding to be encOK.
...@@ -72,7 +70,7 @@ func _TestHandshake(t *T) { ...@@ -72,7 +70,7 @@ func _TestHandshake(t *T) {
var err1, err2 error var err1, err2 error
wg = xsync.NewWorkGroup(bg) wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
_, _, err1 = _handshakeClient(ctx, p1, 1, t.enc) _, err1 = _handshakeClient(ctx, p1, 1, t.enc)
}) })
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
_, _, err2 = _handshakeServer(ctx, p2, 2) _, _, err2 = _handshakeServer(ctx, p2, 2)
...@@ -81,22 +79,84 @@ func _TestHandshake(t *T) { ...@@ -81,22 +79,84 @@ func _TestHandshake(t *T) {
xclose(p1) xclose(p1)
xclose(p2) xclose(p2)
err1Want := "pipe - pipe: handshake (client): protocol version mismatch: peer = 00000002 ; our side = 00000001" e1ok := &_HandshakeError{
err2Want := "pipe - pipe: handshake (server): protocol version mismatch: peer = 00000001 ; our side = 00000002" LocalRole: _LinkClient,
LocalAddr: p1.LocalAddr(),
RemoteAddr: p1.RemoteAddr(),
Err: &_VersionMismatchError{
LocalVer: 1,
RemoteVer: 2,
},
}
e2ok := &_HandshakeError{
LocalRole: _LinkServer,
LocalAddr: p2.LocalAddr(),
RemoteAddr: p2.RemoteAddr(),
Err: &_VersionMismatchError{
LocalVer: 2,
RemoteVer: 1,
},
}
if !(err1 != nil && err1.Error() == err1Want) { if !reflect.DeepEqual(err1, e1ok) {
t.Errorf("handshake ver mismatch: p1: unexpected error:\nhave: %v\nwant: %v", err1, err1Want) t.Errorf("handshake ver mismatch: p1: unexpected error:\nhave: %#v\n %q\nwant: %#v\n %q",
err1, estr(err1), e1ok, e1ok.Error())
} }
if !(err2 != nil && err2.Error() == err2Want) { if !reflect.DeepEqual(err2, e2ok) {
t.Errorf("handshake ver mismatch: p2: unexpected error:\nhave: %v\nwant: %v", err2, err2Want) t.Errorf("handshake ver mismatch: p2: unexpected error:\nhave: %#v\n %q\nwant: %#v\n %q",
err2, estr(err2), e2ok, e2ok.Error())
} }
// tx & rx problem (client) // encoding mismatch (mimic behaviour of NEO/py server who does not accept client-proposed encoding)
p1, p2 = net.Pipe() p1, p2 = net.Pipe()
var err error var err error
var srvEnc proto.Encoding
switch t.enc {
case 'N': srvEnc = 'M'
case 'M': srvEnc = 'N'
default: panic("bug")
}
wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) {
_, err = _handshakeClient(ctx, p1, 1, t.enc)
})
gox(wg, func(ctx context.Context) {
wg := xsync.NewWorkGroup(ctx)
gox(wg, func(ctx context.Context) {
err := txHello("tx hello", p2, 1, srvEnc)
exc.Raiseif(err)
})
gox(wg, func(ctx context.Context) {
rxbuf := newXBufReader(p2, 1024)
_, _, err := rxHello("rx hello", rxbuf)
exc.Raiseif(err)
})
xwait(wg)
})
xwait(wg)
xclose(p1)
xclose(p2)
eok := &_HandshakeError{
LocalRole: _LinkClient,
LocalAddr: p1.LocalAddr(),
RemoteAddr: p1.RemoteAddr(),
Err: &_EncodingMismatchError{
LocalEnc: t.enc,
RemoteEnc: srvEnc,
},
}
if !reflect.DeepEqual(err, eok) {
t.Errorf("handshake encoding mismatch: client: unexpected error:\nhave: %#v\n %q\nwant: %#v\n %q",
err, estr(err), eok, eok.Error())
}
// tx & rx problem (client)
p1, p2 = net.Pipe()
wg = xsync.NewWorkGroup(bg) wg = xsync.NewWorkGroup(bg)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
_, _, err = _handshakeClient(ctx, p1, 1, t.enc) _, err = _handshakeClient(ctx, p1, 1, t.enc)
}) })
gox(wg, func(_ context.Context) { gox(wg, func(_ context.Context) {
xclose(p2) xclose(p2)
...@@ -131,7 +191,7 @@ func _TestHandshake(t *T) { ...@@ -131,7 +191,7 @@ func _TestHandshake(t *T) {
ctx, cancel := context.WithCancel(bg) ctx, cancel := context.WithCancel(bg)
wg = xsync.NewWorkGroup(ctx) wg = xsync.NewWorkGroup(ctx)
gox(wg, func(ctx context.Context) { gox(wg, func(ctx context.Context) {
_, _, err = _handshakeClient(ctx, p1, 1, t.enc) _, err = _handshakeClient(ctx, p1, 1, t.enc)
}) })
tdelay() tdelay()
cancel() cancel()
...@@ -162,3 +222,12 @@ func _TestHandshake(t *T) { ...@@ -162,3 +222,12 @@ func _TestHandshake(t *T) {
t.Errorf("handshake (server): cancel: unexpected error: %#v", err) t.Errorf("handshake (server): cancel: unexpected error: %#v", err)
} }
} }
// estr returns err.Error() or "<nil>".
func estr(err error) string {
if err == nil {
return "<nil>"
} else {
return err.Error()
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment