// Copyright (C) 2016-2017 Nexedi SA and Contributors. // Kirill Smelkov <kirr@nexedi.com> // // This program is free software: you can Use, Study, Modify and Redistribute // it under the terms of the GNU General Public License version 3, or (at your // option) any later version, as published by the Free Software Foundation. // // You can also Link and Combine this program with other software covered by // the terms of any of the Free Software licenses or any of the Open Source // Initiative approved licenses and Convey the resulting work. Corresponding // source of such a combination shall include the source code for all other // software used. // // This program is distributed WITHOUT ANY WARRANTY; without even the implied // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // // See COPYING file for full licensing terms. // See https://www.nexedi.com/licensing for rationale and options. package neo // Connection management import ( "context" "encoding/binary" "errors" "fmt" "io" "net" "reflect" "sync" "sync/atomic" "lab.nexedi.com/kirr/neo/go/xcommon/xnet" ) // NodeLink is a node-node link in NEO // // A node-node link represents bidirectional symmetrical communication // channel in between 2 NEO nodes. The link provides service for packets // exchange and for multiplexing several communication connections on // top of the node-node link. // // New connection can be created with .NewConn() . Once connection is // created and data is sent over it, on peer's side another corresponding // new connection can be accepted via .Accept(), and all further communication // send/receive exchange will be happening in between those 2 connections. // // For a node to be able to accept new incoming connection it has to have // "server" role - see newNodeLink() for details. // // A NodeLink has to be explicitly closed, once it is no longer needed. // // It is safe to use NodeLink from multiple goroutines simultaneously. type NodeLink struct { peerLink net.Conn // raw conn to peer connMu sync.Mutex connTab map[uint32]*Conn // connId -> Conn associated with connId nextConnId uint32 // next connId to use for Conn initiated by us serveWg sync.WaitGroup // for serve{Send,Recv} acceptq chan *Conn // queue of incoming connections for Accept // = nil if NodeLink is not accepting connections txq chan txReq // tx requests from Conns go via here // (rx packets are routed to Conn.rxq) down chan struct{} // ready when NodeLink is marked as no longer operational downOnce sync.Once // shutdown may be due to both Close and IO error downWg sync.WaitGroup // for activities at shutdown errClose error // error got from peerLink.Close errMu sync.Mutex errRecv error // error got from recvPkt on shutdown closed uint32 // whether Close was called } // Conn is a connection established over NodeLink // // Data can be sent and received over it. // Once connection is no longer needed it has to be closed. // // It is safe to use Conn from multiple goroutines simultaneously. type Conn struct { nodeLink *NodeLink connId uint32 rxq chan *PktBuf // received packets for this Conn go here txerr chan error // transmit results for this Conn go back here down chan struct{} // ready when Conn is marked as no longer operational downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed closed uint32 // whether Close was called } var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error var ErrLinkNoListen = errors.New("node link is not listening for incoming connections") var ErrClosedConn = errors.New("connection is closed") // LinkError is usually returned by NodeLink operations type LinkError struct { Link *NodeLink Op string Err error } // ConnError is usually returned by Conn operations type ConnError struct { Conn *Conn Op string Err error } // LinkRole is a role an end of NodeLink is intended to play type LinkRole int const ( LinkServer LinkRole = iota // link created as server LinkClient // link created as client // for testing: linkNoRecvSend LinkRole = 1 << 16 // do not spawn serveRecv & serveSend linkFlagsMask LinkRole = (1<<32 - 1) << 16 ) // newNodeLink makes a new NodeLink from already established net.Conn // // Role specifies how to treat our role on the link - either as client or // server. The difference in between client and server roles are in: // // 1. how connection ids are allocated for connections initiated at our side: // there is no conflict in identifiers if one side always allocates them as // even (server) and its peer as odd (client). // // 2. NodeLink.Accept() works only on server side. // XXX vs client processing e.g. invalidation notifications from master ? // XXX -> we could require that such listen for notification Conn is that // one left after RequestIdentification/AcceptIdentification. // // Usually server role should be used for connections created via // net.Listen/net.Accept and client role for connections created via net.Dial. // // Though it is possible to wrap just-established raw connection into NodeLink, // users should always use Handshake which performs protocol handshaking first. func newNodeLink(conn net.Conn, role LinkRole) *NodeLink { var nextConnId uint32 var acceptq chan *Conn switch role &^ linkFlagsMask { case LinkServer: nextConnId = 0 // all initiated by us connId will be even acceptq = make(chan *Conn) // accept queue; TODO use backlog case LinkClient: nextConnId = 1 // ----//---- odd acceptq = nil // not accepting incoming connections default: panic("invalid conn role") } nl := &NodeLink{ peerLink: conn, connTab: map[uint32]*Conn{}, nextConnId: nextConnId, acceptq: acceptq, txq: make(chan txReq), down: make(chan struct{}), } if role&linkNoRecvSend == 0 { nl.serveWg.Add(2) go nl.serveRecv() go nl.serveSend() } return nl } // newConn creates new Conn with id=connId and registers it into connTab. // Must be called with connMu held. func (nl *NodeLink) newConn(connId uint32) *Conn { c := &Conn{nodeLink: nl, connId: connId, rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send down: make(chan struct{}), } nl.connTab[connId] = c return c } // NewConn creates new connection on top of node-node link func (nl *NodeLink) NewConn() (*Conn, error) { nl.connMu.Lock() defer nl.connMu.Unlock() if nl.connTab == nil { if atomic.LoadUint32(&nl.closed) != 0 { return nil, nl.err("newconn", ErrLinkClosed) } return nil, nl.err("newconn", ErrLinkDown) } c := nl.newConn(nl.nextConnId) nl.nextConnId += 2 return c, nil } // shutdown closes raw link to peer and marks NodeLink as no longer operational. // it also shutdowns all opened connections over this node link. func (nl *NodeLink) shutdown() { nl.downOnce.Do(func() { close(nl.down) // close actual link to peer. this will wakeup {send,recv}Pkt // NOTE we need it here so that e.g. aborting on error in serveSend wakes up serveRecv nl.errClose = nl.peerLink.Close() nl.downWg.Add(1) go func() { defer nl.downWg.Done() // wait for serve{Send,Recv} to complete before shutting connections down // // we have to do it so that e.g. serveSend has chance // to return last error from sendPkt to requester. nl.serveWg.Wait() nl.connMu.Lock() for _, conn := range nl.connTab { // NOTE anything waking up on Conn.down must not lock // connMu - else it will deadlock. conn.shutdown() } nl.connTab = nil // clear + mark down nl.connMu.Unlock() }() }) } // Close closes node-node link. // All blocking operations - Accept and IO on associated connections // established over node link - are automatically interrupted with an error. // Underlying raw connection is closed. // It is safe to call Close several times func (nl *NodeLink) Close() error { atomic.StoreUint32(&nl.closed, 1) nl.shutdown() nl.downWg.Wait() return nl.err("close", nl.errClose) } // shutdown marks connection as no longer operational func (c *Conn) shutdown() { c.downOnce.Do(func() { close(c.down) }) } // Close closes connection. // Any blocked Send*() or Recv*() will be unblocked and return error // // NOTE for Send() - once transmission was started - it will complete in the // background on the wire not to break node-node link framing. // // It is safe to call Close several times. // // TODO Close on one end must make Recv/Send on another end fail // (UC: sending []txn-info) func (c *Conn) Close() error { // adjust nodeLink.connTab // (if nodelink was already shut down and connTab=nil - delete will be noop) c.nodeLink.connMu.Lock() delete(c.nodeLink.connTab, c.connId) c.nodeLink.connMu.Unlock() atomic.StoreUint32(&c.closed, 1) c.shutdown() return nil } // Accept waits for and accepts incoming connection on top of node-node link func (nl *NodeLink) Accept() (c *Conn, err error) { defer func() { if err != nil { err = nl.err("accept", err) } }() // this node link is not accepting connections if nl.acceptq == nil { return nil, ErrLinkNoListen } select { case <-nl.down: if atomic.LoadUint32(&nl.closed) != 0 { return nil, ErrLinkClosed } return nil, ErrLinkDown case c := <-nl.acceptq: return c, nil } } // errRecvShutdown returns appropriate error when c.down is found ready in recvPkt func (c *Conn) errRecvShutdown() error { switch { case atomic.LoadUint32(&c.closed) != 0: return ErrClosedConn case atomic.LoadUint32(&c.nodeLink.closed) != 0: return ErrLinkClosed default: // we have to check what was particular RX error on nodelink shutdown // only do that once - after reporting RX error the first time // tell client the node link is no longer operational. var err error c.rxerrOnce.Do(func() { c.nodeLink.errMu.Lock() err = c.nodeLink.errRecv c.nodeLink.errMu.Unlock() }) if err == nil { err = ErrLinkDown } return err } } // recvPkt receives raw packet from connection func (c *Conn) recvPkt() (*PktBuf, error) { select { case <-c.down: return nil, c.err("recv", c.errRecvShutdown()) case pkt := <-c.rxq: return pkt, nil } } // serveRecv handles incoming packets routing them to either appropriate // already-established connection or, if node link is accepting incoming // connections, to new connection put to accept queue. func (nl *NodeLink) serveRecv() { defer nl.serveWg.Done() for { // receive 1 packet // XXX if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing pkt, err := nl.recvPkt() //fmt.Printf("recvPkt -> %v, %v\n", pkt, err) if err != nil { // on IO error framing over peerLink becomes broken // so we shut down node link and all connections over it. nl.errMu.Lock() nl.errRecv = err nl.errMu.Unlock() nl.shutdown() return } // pkt.ConnId -> Conn connId := ntoh32(pkt.Header().ConnId) accept := false nl.connMu.Lock() // connTab is never nil here - because shutdown before // resetting it waits for us to finish. conn := nl.connTab[connId] if conn == nil { if nl.acceptq != nil { // we are accepting new incoming connection conn = nl.newConn(connId) accept = true } } // we have not accepted incoming connection - ignore packet if conn == nil { // XXX also log / increment counter? nl.connMu.Unlock() continue } // route packet to serving goroutine handler // // TODO backpressure when Recv is not keeping up with Send on peer side? // (not to let whole nodelink starve because of one connection) // // NOTE rxq must be buffered with at least 1 element so that // queuing pkt succeeds for incoming connection that is not yet // there in acceptq. conn.rxq <- pkt // keep connMu locked until here: so that ^^^ `conn.rxq <- pkt` can be // sure conn stays not down e.g. closed by Conn.Close or NodeLink.shutdown // // XXX try to release connMu earlier - before `rxq <- pkt` nl.connMu.Unlock() if accept { select { case <-nl.down: // Accept and loop calling it can exit if shutdown was requested // if so we are also exiting // make sure not to leave rx error as nil nl.errMu.Lock() nl.errRecv = ErrLinkDown nl.errMu.Unlock() return case nl.acceptq <- conn: // ok } } } } // txReq is request to transmit a packet. Result error goes back to errch type txReq struct { pkt *PktBuf errch chan error } // errSendShutdown returns appropriate error when c.down is found ready in Send func (c *Conn) errSendShutdown() error { switch { case atomic.LoadUint32(&c.closed) != 0: return ErrClosedConn // the only other error possible besides Conn being .Close()'ed is that // NodeLink was closed/shutdowned itself - on actual IO problems corresponding // error is delivered to particular Send that caused it. case atomic.LoadUint32(&c.nodeLink.closed) != 0: return ErrLinkClosed default: return ErrLinkDown } } // sendPkt sends raw packet via connection func (c *Conn) sendPkt(pkt *PktBuf) error { err := c.sendPkt2(pkt) return c.err("send", err) } func (c *Conn) sendPkt2(pkt *PktBuf) error { // set pkt connId associated with this connection pkt.Header().ConnId = hton32(c.connId) var err error select { case <-c.down: return c.errSendShutdown() case c.nodeLink.txq <- txReq{pkt, c.txerr}: select { // tx request was sent to serveSend and is being transmitted on the wire. // the transmission may block for indefinitely long though and // we cannot interrupt it as the only way to interrupt is // .nodeLink.Close() which will close all other Conns. // // That's why we are also checking for c.down while waiting // for reply from serveSend (and leave pkt to finish transmitting). // // NOTE after we return straight here serveSend won't be later // blocked on c.txerr<- because that backchannel is a non-blocking one. case <-c.down: // also poll c.txerr here because: when there is TX error, // serveSend sends to c.txerr _and_ closes c.down . // We still want to return actual transmission error to caller. select { case err = <-c.txerr: return err default: return c.errSendShutdown() } case err = <-c.txerr: return err } } } // serveSend handles requests to transmit packets from client connections and // serially executes them over associated node link. func (nl *NodeLink) serveSend() { defer nl.serveWg.Done() for { select { case <-nl.down: return case txreq := <-nl.txq: // XXX if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing err := nl.sendPkt(txreq.pkt) //fmt.Printf("sendPkt -> %v\n", err) txreq.errch <- err // on IO error framing over peerLink becomes broken // so we shut down node link and all connections over it. if err != nil { nl.shutdown() return } } } } // ---- raw IO ---- const dumpio = true // sendPkt sends raw packet to peer // tx error, if any, is returned as is and is analyzed in serveSend func (nl *NodeLink) sendPkt(pkt *PktBuf) error { if dumpio { // XXX -> log fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) //defer fmt.Printf("\t-> sendPkt err: %v\n", err) } // NOTE Write writes data in full, or it is error _, err := nl.peerLink.Write(pkt.Data) return err } var ErrPktTooBig = errors.New("packet too big") // recvPkt receives raw packet from peer // rx error, if any, is returned as is and is analyzed in serveRecv func (nl *NodeLink) recvPkt() (*PktBuf, error) { // TODO organize rx buffers management (freelist etc) // first read to read pkt header and hopefully up to page of data in 1 syscall pkt := &PktBuf{make([]byte, 4096)} // TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ? //n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen) n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen]) if err != nil { return nil, err } pkth := pkt.Header() // XXX -> better PktHeader.Decode() ? pktLen := PktHeadLen + ntoh32(pkth.MsgLen) // .MsgLen is payload-only length without header if pktLen > MAX_PACKET_SIZE { return nil, ErrPktTooBig } // XXX -> pkt.Data = xbytes.Resize32(pkt.Data[:n], pktLen) if pktLen > uint32(cap(pkt.Data)) { // grow rxbuf rxbuf2 := make([]byte, pktLen) copy(rxbuf2, pkt.Data[:n]) pkt.Data = rxbuf2 } // cut .Data len to length of packet pkt.Data = pkt.Data[:pktLen] // read rest of pkt data, if we need to if n < len(pkt.Data) { _, err = io.ReadFull(nl.peerLink, pkt.Data[n:]) if err != nil { return nil, err } } if dumpio { // XXX -> log fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) } return pkt, nil } // ---- Handshake ---- // Handshake performs NEO protocol handshake just after raw connection between 2 nodes was established // On success raw connection is returned wrapped into NodeLink // On error raw connection is closed func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink, err error) { err = handshake(ctx, conn, PROTOCOL_VERSION) if err != nil { return nil, err } // handshake ok -> NodeLink return newNodeLink(conn, role), nil } // HandshakeError is returned when there is an error while performing handshake type HandshakeError struct { // XXX just keep .Conn? (but .Conn can be closed) LocalAddr net.Addr RemoteAddr net.Addr Err error } func (e *HandshakeError) Error() string { return fmt.Sprintf("%s - %s: handshake: %s", e.LocalAddr, e.RemoteAddr, e.Err.Error()) } func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) { errch := make(chan error, 2) // tx handshake word txWg := sync.WaitGroup{} txWg.Add(1) go func() { var b [4]byte binary.BigEndian.PutUint32(b[:], version) // XXX -> hton32 ? _, err := conn.Write(b[:]) // XXX EOF -> ErrUnexpectedEOF ? errch <- err txWg.Done() }() // rx handshake word go func() { var b [4]byte _, err := io.ReadFull(conn, b[:]) if err == io.EOF { err = io.ErrUnexpectedEOF // can be returned with n = 0 } if err == nil { peerVersion := binary.BigEndian.Uint32(b[:]) // XXX -> ntoh32 ? if peerVersion != version { err = fmt.Errorf("protocol version mismatch: peer = %08x ; our side = %08x", peerVersion, version) } } errch <- err }() connClosed := false defer func() { // make sure our version is always sent on the wire, if possible, // so that peer does not see just closed connection when on rx we see version mismatch // // NOTE if cancelled tx goroutine will wake up without delay txWg.Wait() // don't forget to close conn if returning with error + add handshake err context if err != nil { err = &HandshakeError{conn.LocalAddr(), conn.RemoteAddr(), err} if !connClosed { conn.Close() } } }() for i := 0; i < 2; i++ { select { case <-ctx.Done(): conn.Close() // interrupt IO connClosed = true return ctx.Err() case err = <-errch: if err != nil { return err } } } // handshaked ok return nil } // ---- Dial & Listen at raw NodeLink level ---- // DialLink connects to address on given network, handshakes and wraps the connection as NodeLink func DialLink(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, err error) { peerConn, err := net.Dial(ctx, addr) if err != nil { return nil, err } return Handshake(ctx, peerConn, LinkClient) } // ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink. // The listener accepts only those connections that pass handshake. func ListenLink(net xnet.Networker, laddr string) (*LinkListener, error) { rawl, err := net.Listen(laddr) if err != nil { return nil, err } l := &LinkListener{ l: rawl, acceptq: make(chan linkAccepted), closed: make(chan struct{}), } go l.run() return l, nil } // LinkListener wraps net.Listener to return handshaked NodeLink on Accept. // Create only via Listen. type LinkListener struct { l net.Listener acceptq chan linkAccepted closed chan struct {} } type linkAccepted struct { link *NodeLink err error } func (l *LinkListener) Close() error { err := l.l.Close() close(l.closed) return err } func (l *LinkListener) run() { // context that cancels when listener stops runCtx, runCancel := context.WithCancel(context.Background()) defer runCancel() for { // stop on close select { case <-l.closed: return default: } // XXX add backpressure on too much incoming connections without client .Accept ? conn, err := l.l.Accept() go l.accept(runCtx, conn, err) } } func (l *LinkListener) accept(ctx context.Context, conn net.Conn, err error) { link, err := l.accept1(ctx, conn, err) select { case l.acceptq <- linkAccepted{link, err}: // ok case <-l.closed: // shutdown if link != nil { link.Close() } } } func (l *LinkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) { // XXX err ctx? if err != nil { return nil, err } // NOTE Handshake closes conn in case of failure link, err := Handshake(ctx, conn, LinkServer) if err != nil { return nil, err } return link, nil } func (l *LinkListener) Accept() (*NodeLink, error) { select{ case <-l.closed: // we know raw listener is already closed - return proper error about it _, err := l.l.Accept() return nil, err case a := <-l.acceptq: return a.link, a.err } } func (l *LinkListener) Addr() net.Addr { return l.l.Addr() } // ---- for convenience: Conn -> NodeLink & local/remote link addresses ---- // LocalAddr returns local address of the underlying link to peer. func (nl *NodeLink) LocalAddr() net.Addr { return nl.peerLink.LocalAddr() } // RemoteAddr returns remote address of the underlying link to peer. func (nl *NodeLink) RemoteAddr() net.Addr { return nl.peerLink.RemoteAddr() } // Link returns underlying NodeLink of this connection. func (c *Conn) Link() *NodeLink { return c.nodeLink } // ConnID returns connection identifier used for the connection. func (c *Conn) ConnID() uint32 { return c.connId } // ---- for convenience: String / Error ---- func (nl *NodeLink) String() string { s := fmt.Sprintf("%s - %s", nl.LocalAddr(), nl.RemoteAddr()) return s // XXX add "(closed)" if nl is closed ? // XXX other flags e.g. (down) ? } func (c *Conn) String() string { s := fmt.Sprintf("%s .%d", c.nodeLink, c.connId) return s // XXX add "(closed)" if c is closed ? } func (e *LinkError) Error() string { return fmt.Sprintf("%s: %s: %s", e.Link, e.Op, e.Err) } func (e *ConnError) Error() string { return fmt.Sprintf("%s: %s: %s", e.Conn, e.Op, e.Err) } func (nl *NodeLink) err(op string, e error) error { if e == nil { return nil } return &LinkError{Link: nl, Op: op, Err: e} } func (c *Conn) err(op string, e error) error { if e == nil { return nil } return &ConnError{Conn: c, Op: op, Err: e} } // ---- exchange of messages ---- //trace:event traceConnRecv(c *Conn, msg Msg) //trace:event traceConnSendPre(c *Conn, msg Msg) // XXX do we also need traceConnSend? // Recv receives message // it receives packet and decodes message from it func (c *Conn) Recv() (Msg, error) { // TODO use freelist for PktBuf pkt, err := c.recvPkt() if err != nil { return nil, err } // decode packet pkth := pkt.Header() msgCode := ntoh16(pkth.MsgCode) msgType := msgTypeRegistry[msgCode] if msgType == nil { err = fmt.Errorf("invalid msgCode (%d)", msgCode) // XXX "decode" -> "recv: decode"? return nil, &ConnError{Conn: c, Op: "decode", Err: err} } // TODO use free-list for decoded messages + when possible decode in-place msg := reflect.New(msgType).Interface().(Msg) _, err = msg.neoMsgDecode(pkt.Payload()) if err != nil { return nil, &ConnError{Conn: c, Op: "decode", Err: err} } traceConnRecv(c, msg) return msg, nil } // Send sends message // it encodes message into packet and sends it func (c *Conn) Send(msg Msg) error { traceConnSendPre(c, msg) l := msg.neoMsgEncodedLen() buf := PktBuf{make([]byte, PktHeadLen+l)} // TODO -> freelist h := buf.Header() // h.ConnId will be set by conn.Send h.MsgCode = hton16(msg.neoMsgCode()) h.MsgLen = hton32(uint32(l)) // XXX casting: think again msg.neoMsgEncode(buf.Payload()) // XXX why pointer? // XXX more context in err? (msg type) return c.sendPkt(&buf) } // Expect receives message and checks it is one of expected types // // if verification is successful the message is decoded inplace and returned // which indicates index of received message. // // on error (-1, err) is returned func (c *Conn) Expect(msgv ...Msg) (which int, err error) { // XXX a bit dup wrt Recv // TODO use freelist for PktBuf pkt, err := c.recvPkt() if err != nil { return -1, err } pkth := pkt.Header() msgCode := ntoh16(pkth.MsgCode) for i, msg := range msgv { if msg.neoMsgCode() == msgCode { _, err = msg.neoMsgDecode(pkt.Payload()) if err != nil { return -1, &ConnError{Conn: c, Op: "decode", Err: err} } return i, nil } } // unexpected message msgType := msgTypeRegistry[msgCode] if msgType == nil { return -1, &ConnError{c, "decode", fmt.Errorf("invalid msgCode (%d)", msgCode)} } // XXX also add which messages were expected ? return -1, &ConnError{c, "recv", fmt.Errorf("unexpected message: %v", msgType)} } // Ask sends request and receives response // It expects response to be exactly of resp type and errors otherwise // XXX clarify error semantic (when Error is decoded) // XXX do the same as Expect wrt respv ? func (c *Conn) Ask(req Msg, resp Msg) error { err := c.Send(req) if err != nil { return err } nerr := &Error{} which, err := c.Expect(resp, nerr) switch which { case 0: return nil case 1: return ErrDecode(nerr) } return err }