Commit 8052ef1f authored by Kirill Smelkov's avatar Kirill Smelkov

X neonet: polish a bit

parent a440d090
......@@ -20,18 +20,32 @@
// Package neonet provides service to establish links and exchange messages in
// a NEO network.
//
// XXX text (Dial, Listen, ...)
// A NEO node - node link can be established with DialLink and ListenLink
// similarly to how it is done in standard package net. Once established, a
// link (NodeLink) provides service for multiplexing several communication
// connections on top of it. Connections (Conn) in turn provide service to
// exchange NEO protocol messages.
//
// XXX lightweight-mode
// New connections can be created with link.NewConn(). Once connection is
// created and message is sent over it, on peer's side another corresponding
// new connection can be accepted via link.Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections.
//
// Use conn.Send and conn.Recv to actually exchange messages. See Conn
// documentation for other message-exchange utilities like Ask and Expect.
//
// See also package lab.nexedi.com/kirr/neo/go/neo/proto for definition of NEO
// messages.
//
//
// Lightweight mode
//
// XXX link -> lab.nexedi.com/kirr/neo/go/neo/proto
// XXX document
package neonet
//go:generate gotrace gen .
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
......@@ -40,7 +54,6 @@ import (
"reflect"
//"runtime"
"sync"
"sync/atomic"
"time"
"lab.nexedi.com/kirr/neo/go/neo/proto"
......@@ -49,15 +62,13 @@ import (
"github.com/someonegg/gocontainer/rbuf"
"lab.nexedi.com/kirr/go123/xbytes"
"lab.nexedi.com/kirr/go123/xnet"
)
// NodeLink is a node-node link in NEO
// NodeLink is a node-node link in NEO.
//
// A node-node link represents bidirectional symmetrical communication
// channel in between 2 NEO nodes. The link provides service for packets
// exchange and for multiplexing several communication connections on
// top of the node-node link.
// channel in between 2 NEO nodes. The link provides service for multiplexing
// several communication connections on top of the node-node link.
//
// New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding
......@@ -114,9 +125,9 @@ type NodeLink struct {
// XXX ^^^ problem reproducible on deco but not on z6001
const rxghandoff = true // XXX whether to do rxghandoff trick
// Conn is a connection established over NodeLink
// Conn is a connection established over NodeLink.
//
// Data can be sent and received over it.
// Messages can be sent and received over it.
// Once connection is no longer needed it has to be closed.
//
// It is safe to use Conn from multiple goroutines simultaneously.
......@@ -166,6 +177,8 @@ var ErrLinkManyConn = errors.New("too many opened connections")
var ErrClosedConn = errors.New("connection is closed")
// LinkError is returned by NodeLink operations
//
// XXX -> when error scope is whole link ?
type LinkError struct {
Link *NodeLink
Op string
......@@ -173,6 +186,8 @@ type LinkError struct {
}
// ConnError is returned by Conn operations
//
// XXX -> when error scope is connection ?
type ConnError struct {
Link *NodeLink
ConnId uint32 // NOTE Conn's are reused - cannot use *Conn here
......@@ -180,15 +195,17 @@ type ConnError struct {
Err error
}
// LinkRole is a role an end of NodeLink is intended to play
type LinkRole int
// _LinkRole is a role an end of NodeLink is intended to play
//
// XXX _LinkRole will need to become public again if _Handshake does.
type _LinkRole int
const (
LinkServer LinkRole = iota // link created as server
LinkClient // link created as client
_LinkServer _LinkRole = iota // link created as server
_LinkClient // link created as client
// for testing:
linkNoRecvSend LinkRole = 1 << 16 // do not spawn serveRecv & serveSend
linkFlagsMask LinkRole = (1<<32 - 1) << 16
linkNoRecvSend _LinkRole = 1 << 16 // do not spawn serveRecv & serveSend
linkFlagsMask _LinkRole = (1<<32 - 1) << 16
)
// newNodeLink makes a new NodeLink from already established net.Conn
......@@ -205,12 +222,12 @@ const (
//
// Though it is possible to wrap just-established raw connection into NodeLink,
// users should always use Handshake which performs protocol handshaking first.
func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
func newNodeLink(conn net.Conn, role _LinkRole) *NodeLink {
var nextConnId uint32
switch role &^ linkFlagsMask {
case LinkServer:
case _LinkServer:
nextConnId = 0 // all initiated by us connId will be even
case LinkClient:
case _LinkClient:
nextConnId = 1 // ----//---- odd
default:
panic("invalid conn role")
......@@ -299,45 +316,45 @@ func ensureOpen(ch *chan struct{}) {
// newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn {
c := nl.connAlloc(connId)
nl.connTab[connId] = c
func (link *NodeLink) newConn(connId uint32) *Conn {
c := link.connAlloc(connId)
link.connTab[connId] = c
return c
}
// NewConn creates new connection on top of node-node link.
func (nl *NodeLink) NewConn() (*Conn, error) {
nl.connMu.Lock()
//defer nl.connMu.Unlock()
c, err := nl._NewConn()
nl.connMu.Unlock()
func (link *NodeLink) NewConn() (*Conn, error) {
link.connMu.Lock()
//defer link.connMu.Unlock()
c, err := link._NewConn()
link.connMu.Unlock()
return c, err
}
func (nl *NodeLink) _NewConn() (*Conn, error) {
if nl.connTab == nil {
if nl.closed.Get() != 0 {
return nil, nl.err("newconn", ErrLinkClosed)
func (link *NodeLink) _NewConn() (*Conn, error) {
if link.connTab == nil {
if link.closed.Get() != 0 {
return nil, link.err("newconn", ErrLinkClosed)
}
return nil, nl.err("newconn", ErrLinkDown)
return nil, link.err("newconn", ErrLinkDown)
}
// nextConnId could wrap around uint32 limits - find first free slot to
// not blindly replace existing connection
for i := uint32(0) ;; i++ {
_, exists := nl.connTab[nl.nextConnId]
_, exists := link.connTab[link.nextConnId]
if !exists {
break
}
nl.nextConnId += 2
link.nextConnId += 2
if i > math.MaxUint32 / 2 {
return nil, nl.err("newconn", ErrLinkManyConn)
return nil, link.err("newconn", ErrLinkManyConn)
}
}
c := nl.newConn(nl.nextConnId)
nl.nextConnId += 2
c := link.newConn(link.nextConnId)
link.nextConnId += 2
return c, nil
}
......@@ -433,7 +450,8 @@ func (nl *NodeLink) shutdown() {
// CloseAccept instructs node link to not accept incoming connections anymore.
//
// Any blocked Accept() will be unblocked and return error.
// The peer will receive "connection refused" if it tries to connect after.
// The peer will receive "connection refused" if it tries to connect after and
// for already-queued connection requests.
//
// It is safe to call CloseAccept several times.
func (link *NodeLink) CloseAccept() {
......@@ -447,12 +465,12 @@ func (link *NodeLink) CloseAccept() {
// established over node link - are automatically interrupted with an error.
// Underlying raw connection is closed.
// It is safe to call Close several times.
func (nl *NodeLink) Close() error {
nl.axclosed.Set(1)
nl.closed.Set(1)
nl.shutdown()
nl.downWg.Wait()
return nl.err("close", nl.errClose)
func (link *NodeLink) Close() error {
link.axclosed.Set(1)
link.closed.Set(1)
link.shutdown()
link.downWg.Wait()
return link.err("close", link.errClose)
}
// shutdown marks connection as no longer operational and interrupts Send and Recv.
......@@ -544,7 +562,8 @@ var connKeepClosed = 1*time.Minute
// CloseRecv closes reading end of connection.
//
// Any blocked Recv*() will be unblocked and return error.
// The peer will receive "connection closed" if it tries to send anything after.
// The peer will receive "connection closed" if it tries to send anything after
// and for messages already in local rx queue.
//
// It is safe to call CloseRecv several times.
func (c *Conn) CloseRecv() {
......@@ -561,7 +580,7 @@ func (c *Conn) CloseRecv() {
//
// It is safe to call Close several times.
func (c *Conn) Close() error {
nl := c.link
link := c.link
c.closeOnce.Do(func() {
c.rxclosed.Set(1)
c.txclosed.Set(1)
......@@ -569,14 +588,14 @@ func (c *Conn) Close() error {
// adjust link.connTab
var tmpclosed *Conn
nl.connMu.Lock()
if nl.connTab != nil {
link.connMu.Lock()
if link.connTab != nil {
// connection was initiated by us - simply delete - we always
// know if a packet comes to such connection - it is closed.
//
// XXX checking vvv should be possible without connMu lock
if c.connId == nl.nextConnId % 2 {
delete(nl.connTab, c.connId)
if c.connId == link.nextConnId % 2 {
delete(link.connTab, c.connId)
// connection was initiated by peer which we accepted - put special
// "closed" connection into connTab entry for some time to reply
......@@ -585,22 +604,22 @@ func (c *Conn) Close() error {
// ( we cannot reuse same connection since after it is marked as
// closed Send refuses to work )
} else {
// delete(nl.connTab, c.connId)
// delete(link.connTab, c.connId)
// XXX vvv was temp. disabled - costs a lot in 1req=1conn model
// c implicitly goes away from connTab
tmpclosed = nl.newConn(c.connId)
tmpclosed = link.newConn(c.connId)
}
}
nl.connMu.Unlock()
link.connMu.Unlock()
if tmpclosed != nil {
tmpclosed.shutdownRX(errConnClosed)
time.AfterFunc(connKeepClosed, func() {
nl.connMu.Lock()
delete(nl.connTab, c.connId)
nl.connMu.Unlock()
link.connMu.Lock()
delete(link.connTab, c.connId)
link.connMu.Unlock()
})
}
})
......@@ -626,33 +645,33 @@ func (link *NodeLink) errAcceptShutdownAX() error {
}
// Accept waits for and accepts incoming connection on top of node-node link.
func (nl *NodeLink) Accept() (*Conn, error) {
func (link *NodeLink) Accept() (*Conn, error) {
// semantically equivalent to the following:
// ( this is hot path for py compatibility mode because new connection
// is established in every message and select hurts performance )
//
// select {
// case <-nl.axdown:
// return nil, nl.err("accept", nl.errAcceptShutdownAX())
// case <-link.axdown:
// return nil, link.err("accept", link.errAcceptShutdownAX())
//
// case c := <-nl.acceptq:
// case c := <-link.acceptq:
// return c, nil
// }
var conn *Conn
var err error
nl.axqRead.Add(1)
axdown := nl.axdownFlag.Get() != 0
link.axqRead.Add(1)
axdown := link.axdownFlag.Get() != 0
if !axdown {
conn = <-nl.acceptq
conn = <-link.acceptq
}
nl.axqRead.Add(-1)
link.axqRead.Add(-1)
// in contrast to recvPkt we can decide about error after releasing axqRead
// reason: link is not going to be released to a free pool.
if axdown || conn == nil {
err = nl.err("accept", nl.errAcceptShutdownAX())
err = link.err("accept", link.errAcceptShutdownAX())
}
return conn, err
......@@ -1177,238 +1196,16 @@ func (nl *NodeLink) recvPkt() (*pktBuf, error) {
}
// ---- Handshake ----
// Handshake performs NEO protocol handshake just after raw connection between 2 nodes was established.
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink, err error) {
err = handshake(ctx, conn, proto.Version)
if err != nil {
return nil, err
}
// handshake ok -> NodeLink
return newNodeLink(conn, role), nil
}
// HandshakeError is returned when there is an error while performing handshake
type HandshakeError struct {
// XXX just keep .Conn? (but .Conn can be closed)
LocalAddr net.Addr
RemoteAddr net.Addr
Err error
}
func (e *HandshakeError) Error() string {
return fmt.Sprintf("%s - %s: handshake: %s", e.LocalAddr, e.RemoteAddr, e.Err.Error())
}
func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
errch := make(chan error, 2)
// tx handshake word
txWg := sync.WaitGroup{}
txWg.Add(1)
go func() {
var b [4]byte
binary.BigEndian.PutUint32(b[:], version) // XXX -> hton32 ?
_, err := conn.Write(b[:])
// XXX EOF -> ErrUnexpectedEOF ?
errch <- err
txWg.Done()
}()
// rx handshake word
go func() {
var b [4]byte
_, err := io.ReadFull(conn, b[:])
if err == io.EOF {
err = io.ErrUnexpectedEOF // can be returned with n = 0
}
if err == nil {
peerVersion := binary.BigEndian.Uint32(b[:]) // XXX -> ntoh32 ?
if peerVersion != version {
err = fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVersion, version)
}
}
errch <- err
}()
connClosed := false
defer func() {
// make sure our version is always sent on the wire, if possible,
// so that peer does not see just closed connection when on rx we see version mismatch
//
// NOTE if cancelled tx goroutine will wake up without delay
txWg.Wait()
// don't forget to close conn if returning with error + add handshake err context
if err != nil {
err = &HandshakeError{conn.LocalAddr(), conn.RemoteAddr(), err}
if !connClosed {
conn.Close()
}
}
}()
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
conn.Close() // interrupt IO
connClosed = true
return ctx.Err()
case err = <-errch:
if err != nil {
return err
}
}
}
// handshaked ok
return nil
}
// ---- Dial & Listen at raw NodeLink level ----
// DialLink connects to address on given network, handshakes and wraps the connection as NodeLink
func DialLink(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, err error) {
peerConn, err := net.Dial(ctx, addr)
if err != nil {
return nil, err
}
return Handshake(ctx, peerConn, LinkClient)
}
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
//
// The listener accepts only those connections that pass handshake.
func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
rawl, err := net.Listen(laddr)
if err != nil {
return nil, err
}
l := &linkListener{
l: rawl,
acceptq: make(chan linkAccepted),
closed: make(chan struct{}),
}
go l.run()
return l, nil
}
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept.
type LinkListener interface {
// from net.Listener:
Close() error
Addr() net.Addr
// Accept returns new incoming connection wrapped into NodeLink.
// It accepts only those connections which pass handshake.
Accept() (*NodeLink, error)
}
type linkListener struct {
l net.Listener
acceptq chan linkAccepted
closed chan struct {}
}
type linkAccepted struct {
link *NodeLink
err error
}
func (l *linkListener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *linkListener) run() {
// context that cancels when listener stops
runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
conn, err := l.l.Accept()
go l.accept(runCtx, conn, err)
}
}
func (l *linkListener) accept(ctx context.Context, conn net.Conn, err error) {
link, err := l.accept1(ctx, conn, err)
select {
case l.acceptq <- linkAccepted{link, err}:
// ok
case <-l.closed:
// shutdown
if link != nil {
link.Close()
}
}
}
func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
// XXX err ctx?
if err != nil {
return nil, err
}
// NOTE Handshake closes conn in case of failure
link, err := Handshake(ctx, conn, LinkServer)
if err != nil {
return nil, err
}
return link, nil
}
func (l *linkListener) Accept() (*NodeLink, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, err
case a := <-l.acceptq:
return a.link, a.err
}
}
func (l *linkListener) Addr() net.Addr {
return l.l.Addr()
}
// ---- for convenience: Conn -> NodeLink & local/remote link addresses ----
// LocalAddr returns local address of the underlying link to peer.
func (nl *NodeLink) LocalAddr() net.Addr {
return nl.peerLink.LocalAddr()
func (link *NodeLink) LocalAddr() net.Addr {
return link.peerLink.LocalAddr()
}
// RemoteAddr returns remote address of the underlying link to peer.
func (nl *NodeLink) RemoteAddr() net.Addr {
return nl.peerLink.RemoteAddr()
func (link *NodeLink) RemoteAddr() net.Addr {
return link.peerLink.RemoteAddr()
}
// Link returns underlying NodeLink of this connection.
......@@ -1423,9 +1220,10 @@ func (c *Conn) ConnID() uint32 {
// ---- for convenience: String / Error / Cause ----
func (nl *NodeLink) String() string {
s := fmt.Sprintf("%s - %s", nl.LocalAddr(), nl.RemoteAddr())
return s // XXX add "(closed)" if nl is closed ?
func (link *NodeLink) String() string {
s := fmt.Sprintf("%s - %s", link.LocalAddr(), link.RemoteAddr())
return s // XXX add "(closed)" if link is closed ?
// XXX other flags e.g. (down) ?
}
......@@ -1482,8 +1280,7 @@ func msgPack(connId uint32, msg proto.Msg) *pktBuf {
// TODO msgUnpack
// Recv receives message
// it receives packet and decodes message from it
// Recv receives message from the connection.
func (c *Conn) Recv() (proto.Msg, error) {
pkt, err := c.recvPkt()
if err != nil {
......@@ -1533,9 +1330,7 @@ func (link *NodeLink) sendMsg(connId uint32, msg proto.Msg) error {
// FIXME ^^^ shutdown whole link on error
}
// Send sends message.
//
// it encodes message into packet and sends it.
// Send sends message over the connection.
func (c *Conn) Send(msg proto.Msg) error {
traceMsgSendPre(c.link, c.connId, msg)
......@@ -1550,10 +1345,10 @@ func (c *Conn) sendMsgDirect(msg proto.Msg) error {
// Expect receives message and checks it is one of expected types
//
// if verification is successful the message is decoded inplace and returned
// If verification is successful the message is decoded inplace and returned
// which indicates index of received message.
//
// on error (-1, err) is returned
// On error (-1, err) is returned.
func (c *Conn) Expect(msgv ...proto.Msg) (which int, err error) {
// XXX a bit dup wrt Recv
pkt, err := c.recvPkt()
......@@ -1592,7 +1387,8 @@ func (c *Conn) _Expect(pkt *pktBuf, msgv ...proto.Msg) (int, error) {
// Ask sends request and receives response.
//
// It expects response to be exactly of resp type and errors otherwise
// It expects response to be exactly of resp type and errors otherwise.
//
// XXX clarify error semantic (when Error is decoded)
// XXX do the same as Expect wrt respv ?
func (c *Conn) Ask(req proto.Msg, resp proto.Msg) error {
......@@ -1753,22 +1549,3 @@ func (conn *Conn) _Ask1(req proto.Msg, resp proto.Msg) error {
func (req *Request) Link() *NodeLink {
return req.conn.Link()
}
// ---- misc ----
// syntax sugar for atomic load/store to raise signal/noise in logic
type atomic32 struct {
v int32 // struct member so `var a atomic32; if a == 0 ...` does not work
}
func (a *atomic32) Get() int32 {
return atomic.LoadInt32(&a.v)
}
func (a *atomic32) Set(v int32) {
atomic.StoreInt32(&a.v, v)
}
func (a *atomic32) Add(δ int32) int32 {
return atomic.AddInt32(&a.v, δ)
}
......@@ -76,11 +76,6 @@ func xwait(w interface { Wait() error }) {
exc.Raiseif(err)
}
func xhandshake(ctx context.Context, c net.Conn, version uint32) {
err := handshake(ctx, c, version)
exc.Raiseif(err)
}
func gox(wg interface { Go(func() error) }, xf func()) {
wg.Go(exc.Funcx(xf))
}
......@@ -162,10 +157,10 @@ func tdelay() {
}
// create NodeLinks connected via net.Pipe
func _nodeLinkPipe(flags1, flags2 LinkRole) (nl1, nl2 *NodeLink) {
func _nodeLinkPipe(flags1, flags2 _LinkRole) (nl1, nl2 *NodeLink) {
node1, node2 := net.Pipe()
nl1 = newNodeLink(node1, LinkClient | flags1)
nl2 = newNodeLink(node2, LinkServer | flags2)
nl1 = newNodeLink(node1, _LinkClient | flags1)
nl2 = newNodeLink(node2, _LinkServer | flags2)
return nl1, nl2
}
......@@ -665,84 +660,6 @@ func TestNodeLink(t *testing.T) {
}
func TestHandshake(t *testing.T) {
bg := context.Background()
// handshake ok
p1, p2 := net.Pipe()
wg := &errgroup.Group{}
gox(wg, func() {
xhandshake(bg, p1, 1)
})
gox(wg, func() {
xhandshake(bg, p2, 1)
})
xwait(wg)
xclose(p1)
xclose(p2)
// version mismatch
p1, p2 = net.Pipe()
var err1, err2 error
wg = &errgroup.Group{}
gox(wg, func() {
err1 = handshake(bg, p1, 1)
})
gox(wg, func() {
err2 = handshake(bg, p2, 2)
})
xwait(wg)
xclose(p1)
xclose(p2)
err1Want := "pipe - pipe: handshake: protocol version mismatch: peer = 00000002 ; our side = 00000001"
err2Want := "pipe - pipe: handshake: protocol version mismatch: peer = 00000001 ; our side = 00000002"
if !(err1 != nil && err1.Error() == err1Want) {
t.Errorf("handshake ver mismatch: p1: unexpected error:\nhave: %v\nwant: %v", err1, err1Want)
}
if !(err2 != nil && err2.Error() == err2Want) {
t.Errorf("handshake ver mismatch: p2: unexpected error:\nhave: %v\nwant: %v", err2, err2Want)
}
// tx & rx problem
p1, p2 = net.Pipe()
err1, err2 = nil, nil
wg = &errgroup.Group{}
gox(wg, func() {
err1 = handshake(bg, p1, 1)
})
gox(wg, func() {
xclose(p2)
})
xwait(wg)
xclose(p1)
err11, ok := err1.(*HandshakeError)
if !ok || !(err11.Err == io.ErrClosedPipe /* on Write */ || err11.Err == io.ErrUnexpectedEOF /* on Read */) {
t.Errorf("handshake peer close: unexpected error: %#v", err1)
}
// ctx cancel
p1, p2 = net.Pipe()
ctx, cancel := context.WithCancel(bg)
gox(wg, func() {
err1 = handshake(ctx, p1, 1)
})
tdelay()
cancel()
xwait(wg)
xclose(p1)
xclose(p2)
err11, ok = err1.(*HandshakeError)
if !ok || !(err11.Err == context.Canceled) {
t.Errorf("handshake cancel: unexpected error: %#v", err1)
}
}
// ---- recv1 mode ----
func xSend(c *Conn, msg proto.Msg) {
......@@ -1211,12 +1128,12 @@ func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) {
wg := &errgroup.Group{}
gox(wg, func() {
l, err := Handshake(context.Background(), c1, LinkClient)
l, err := _Handshake(context.Background(), c1, _LinkClient)
exc.Raiseif(err)
l1 = l
})
gox(wg, func() {
l, err := Handshake(context.Background(), c2, LinkServer)
l, err := _Handshake(context.Background(), c2, _LinkServer)
exc.Raiseif(err)
l2 = l
})
......
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neonet
// syntax sugar for atomic load/store to raise signal/noise in logic
import "sync/atomic"
type atomic32 struct {
v int32 // struct member so `var a atomic32; if a == 0 ...` does not work
}
func (a *atomic32) Get() int32 {
return atomic.LoadInt32(&a.v)
}
func (a *atomic32) Set(v int32) {
atomic.StoreInt32(&a.v, v)
}
func (a *atomic32) Add(δ int32) int32 {
return atomic.AddInt32(&a.v, δ)
}
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neonet
// link establishment
import (
"context"
"encoding/binary"
"fmt"
"io"
"net"
"sync"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// ---- Handshake ----
// XXX _Handshake may be needed to become public in case when we have already
// established raw connection and want to hand-over it to NEO. But currently we
// do not have such uses.
// _Handshake performs NEO protocol handshake just after raw connection between
// 2 nodes was established.
//
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func _Handshake(ctx context.Context, conn net.Conn, role _LinkRole) (nl *NodeLink, err error) {
err = handshake(ctx, conn, proto.Version)
if err != nil {
return nil, err
}
// handshake ok -> NodeLink
return newNodeLink(conn, role), nil
}
// _HandshakeError is returned when there is an error while performing handshake.
type _HandshakeError struct {
LocalAddr net.Addr
RemoteAddr net.Addr
Err error
}
func (e *_HandshakeError) Error() string {
return fmt.Sprintf("%s - %s: handshake: %s", e.LocalAddr, e.RemoteAddr, e.Err.Error())
}
func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
errch := make(chan error, 2)
// tx handshake word
txWg := sync.WaitGroup{}
txWg.Add(1)
go func() {
var b [4]byte
binary.BigEndian.PutUint32(b[:], version) // XXX -> hton32 ?
_, err := conn.Write(b[:])
// XXX EOF -> ErrUnexpectedEOF ?
errch <- err
txWg.Done()
}()
// rx handshake word
go func() {
var b [4]byte
_, err := io.ReadFull(conn, b[:])
if err == io.EOF {
err = io.ErrUnexpectedEOF // can be returned with n = 0
}
if err == nil {
peerVersion := binary.BigEndian.Uint32(b[:]) // XXX -> ntoh32 ?
if peerVersion != version {
err = fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVersion, version)
}
}
errch <- err
}()
connClosed := false
defer func() {
// make sure our version is always sent on the wire, if possible,
// so that peer does not see just closed connection when on rx we see version mismatch
//
// NOTE if cancelled tx goroutine will wake up without delay
txWg.Wait()
// don't forget to close conn if returning with error + add handshake err context
if err != nil {
err = &_HandshakeError{conn.LocalAddr(), conn.RemoteAddr(), err}
if !connClosed {
conn.Close()
}
}
}()
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
conn.Close() // interrupt IO
connClosed = true
return ctx.Err()
case err = <-errch:
if err != nil {
return err
}
}
}
// handshaked ok
return nil
}
// ---- Dial & Listen at NodeLink level ----
// DialLink connects to address on given network, performs NEO protocol
// handshake and wraps the connection as NodeLink.
func DialLink(ctx context.Context, net xnet.Networker, addr string) (*NodeLink, error) {
peerConn, err := net.Dial(ctx, addr)
if err != nil {
return nil, err
}
return _Handshake(ctx, peerConn, _LinkClient)
}
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
//
// The listener accepts only those connections that pass NEO protocol handshake.
func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
rawl, err := net.Listen(laddr)
if err != nil {
return nil, err
}
l := &linkListener{
l: rawl,
acceptq: make(chan linkAccepted),
closed: make(chan struct{}),
}
go l.run()
return l, nil
}
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept.
type LinkListener interface {
// from net.Listener:
Close() error
Addr() net.Addr
// Accept returns new incoming connection wrapped into NodeLink.
// It accepts only those connections which pass NEO protocol handshake.
Accept() (*NodeLink, error)
}
type linkListener struct {
l net.Listener
acceptq chan linkAccepted
closed chan struct {}
}
type linkAccepted struct {
link *NodeLink
err error
}
func (l *linkListener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *linkListener) run() {
// context that cancels when listener stops
runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
conn, err := l.l.Accept()
go l.accept(runCtx, conn, err)
}
}
func (l *linkListener) accept(ctx context.Context, conn net.Conn, err error) {
link, err := l.accept1(ctx, conn, err)
select {
case l.acceptq <- linkAccepted{link, err}:
// ok
case <-l.closed:
// shutdown
if link != nil {
link.Close()
}
}
}
func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
// XXX err ctx?
if err != nil {
return nil, err
}
// NOTE Handshake closes conn in case of failure
link, err := _Handshake(ctx, conn, _LinkServer)
if err != nil {
return nil, err
}
return link, nil
}
func (l *linkListener) Accept() (*NodeLink, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, err
case a := <-l.acceptq:
return a.link, a.err
}
}
func (l *linkListener) Addr() net.Addr {
return l.l.Addr()
}
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neonet
import (
"context"
"io"
"net"
"testing"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
)
func xhandshake(ctx context.Context, c net.Conn, version uint32) {
err := handshake(ctx, c, version)
exc.Raiseif(err)
}
func TestHandshake(t *testing.T) {
bg := context.Background()
// handshake ok
p1, p2 := net.Pipe()
wg := &errgroup.Group{}
gox(wg, func() {
xhandshake(bg, p1, 1)
})
gox(wg, func() {
xhandshake(bg, p2, 1)
})
xwait(wg)
xclose(p1)
xclose(p2)
// version mismatch
p1, p2 = net.Pipe()
var err1, err2 error
wg = &errgroup.Group{}
gox(wg, func() {
err1 = handshake(bg, p1, 1)
})
gox(wg, func() {
err2 = handshake(bg, p2, 2)
})
xwait(wg)
xclose(p1)
xclose(p2)
err1Want := "pipe - pipe: handshake: protocol version mismatch: peer = 00000002 ; our side = 00000001"
err2Want := "pipe - pipe: handshake: protocol version mismatch: peer = 00000001 ; our side = 00000002"
if !(err1 != nil && err1.Error() == err1Want) {
t.Errorf("handshake ver mismatch: p1: unexpected error:\nhave: %v\nwant: %v", err1, err1Want)
}
if !(err2 != nil && err2.Error() == err2Want) {
t.Errorf("handshake ver mismatch: p2: unexpected error:\nhave: %v\nwant: %v", err2, err2Want)
}
// tx & rx problem
p1, p2 = net.Pipe()
err1, err2 = nil, nil
wg = &errgroup.Group{}
gox(wg, func() {
err1 = handshake(bg, p1, 1)
})
gox(wg, func() {
xclose(p2)
})
xwait(wg)
xclose(p1)
err11, ok := err1.(*_HandshakeError)
if !ok || !(err11.Err == io.ErrClosedPipe /* on Write */ || err11.Err == io.ErrUnexpectedEOF /* on Read */) {
t.Errorf("handshake peer close: unexpected error: %#v", err1)
}
// ctx cancel
p1, p2 = net.Pipe()
ctx, cancel := context.WithCancel(bg)
gox(wg, func() {
err1 = handshake(ctx, p1, 1)
})
tdelay()
cancel()
xwait(wg)
xclose(p1)
xclose(p2)
err11, ok = err1.(*_HandshakeError)
if !ok || !(err11.Err == context.Canceled) {
t.Errorf("handshake cancel: unexpected error: %#v", err1)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment