connection.go 29.6 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1 2
// Copyright (C) 2016-2017  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
3 4
//
// This program is free software: you can Use, Study, Modify and Redistribute
5
// it under the terms of the GNU General Public License version 3, or (at your
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
6 7
// option) any later version, as published by the Free Software Foundation.
//
8
// You can also Link and Combine this program with other software covered by
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
9 10 11 12
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
13
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
14 15 16 17
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
18
// See https://www.nexedi.com/licensing for rationale and options.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
19 20

package neo
21
// Connection management
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
22 23

import (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
24
	"context"
25
	"encoding/binary"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
	"errors"
27
	"fmt"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
28
	"io"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
29
	"math"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
30
	"net"
31
	"reflect"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
32
	"sync"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
33
	"sync/atomic"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
34
	"time"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
35

36
	"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
37 38
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
39
// NodeLink is a node-node link in NEO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
40
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
41 42 43 44
// A node-node link represents bidirectional symmetrical communication
// channel in between 2 NEO nodes. The link provides service for packets
// exchange and for multiplexing several communication connections on
// top of the node-node link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
45
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
46 47
// New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding
48 49
// new connection can be accepted via .Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
50
//
51
// For a node to be able to accept new incoming connection it has to have
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
52
// "server" role - see newNodeLink() for details. XXX might change to everyone is able to accept.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
54
// A NodeLink has to be explicitly closed, once it is no longer needed.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
55
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
56
// It is safe to use NodeLink from multiple goroutines simultaneously.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
57
type NodeLink struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
58
	peerLink net.Conn // raw conn to peer
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
59

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
60
	connMu     sync.Mutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
61 62
	connTab    map[uint32]*Conn // connId -> Conn associated with connId
	nextConnId uint32           // next connId to use for Conn initiated by us
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
63

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
64 65 66 67 68
	serveWg sync.WaitGroup	// for serve{Send,Recv}
	acceptq chan *Conn	// queue of incoming connections for Accept
				// = nil if NodeLink is not accepting connections
	txq	chan txReq	// tx requests from Conns go via here
				// (rx packets are routed to Conn.rxq)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
69

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
70 71 72 73
	down     chan struct{}  // ready when NodeLink is marked as no longer operational
	downOnce sync.Once      // shutdown may be due to both Close and IO error
	downWg   sync.WaitGroup // for activities at shutdown
	errClose error          // error got from peerLink.Close
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
74

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
75 76
	errMu    sync.Mutex
	errRecv	 error		// error got from recvPkt on shutdown
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
77

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
78
	closed   uint32		// whether Close was called
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
79 80
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
81 82 83 84 85
// Conn is a connection established over NodeLink
//
// Data can be sent and received over it.
// Once connection is no longer needed it has to be closed.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
86
// It is safe to use Conn from multiple goroutines simultaneously.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
87
type Conn struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
88
	nodeLink  *NodeLink
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
89
	connId    uint32
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
90
	rxq	  chan *PktBuf	// received packets for this Conn go here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
91
	txerr     chan error	// transmit results for this Conn go back here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
92

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
93 94 95
	down      chan struct{} // ready when Conn is marked as no longer operational
	downOnce  sync.Once	// shutdown may be called by both Close and nodelink.shutdown

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
96
	rxerrOnce sync.Once     // rx error is reported only once - then it is link down or closed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97 98 99 100
	closed    int32         // 1 if Close was called or "connection closed" entry
				// incremented during every replyNoConn() in progress

	errMsg	  *Error	// error message for replyNoConn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
101 102
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
103

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
104 105
var ErrLinkClosed   = errors.New("node link is closed")	// operations on closed NodeLink
var ErrLinkDown     = errors.New("node link is down")	// e.g. due to IO error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
106
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
107
var ErrLinkManyConn = errors.New("too many opened connections")
108
var ErrClosedConn   = errors.New("connection is closed")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
109

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
110 111 112 113
// XXX unify LinkError & ConnError -> NetError?
// (think from point of view how user should be handling errors)
// XXX or it is good to be able to distinguish between only conn error vs whole-link error?

114 115 116 117 118 119 120 121 122 123 124 125 126
// LinkError is usually returned by NodeLink operations
type LinkError struct {
	Link *NodeLink
	Op   string
	Err  error
}

// ConnError is usually returned by Conn operations
type ConnError struct {
	Conn *Conn
	Op   string
	Err  error
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
127

128
// LinkRole is a role an end of NodeLink is intended to play
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
129
type LinkRole int
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
130
const (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
131 132
	LinkServer LinkRole = iota // link created as server
	LinkClient                 // link created as client
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
133 134 135 136

	// for testing:
	linkNoRecvSend LinkRole = 1 << 16 // do not spawn serveRecv & serveSend
	linkFlagsMask  LinkRole = (1<<32 - 1) << 16
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137 138
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
139
/*
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
140 141 142 143 144 145 146 147 148 149 150 151 152
// LinkFlags allow to customize NodeLink behaviour
type LinkFlags int
const (
	// LinkListen tells link to accept incoming connections.
	//
	// NOTE it is valid use-case even for link originating through DialLink
	// to accept incoming connections over established channel.
	//
	// NOTE listen put to flags - not e.g. link.Listen() call - because
	// otherwise e.g. for client originated links if after DialLink client
	// calls link.Listen() there is a race window: before Listen is called
	// in which peer could start connecting to our side.
	LinkListen LinkFlags = 1 << iota
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
153

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
154
	// for testing:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
155
	linkNoRecvSend LinkFlags = 1 << 16 // do not spawn serveRecv & serveSend
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
156
)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
157
*/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
158

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
159
// newNodeLink makes a new NodeLink from already established net.Conn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
160
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
161
// Role specifies how to treat our role on the link - either as client or
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
162
// server. The difference in between client and server roles is in:
163
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
164
//    how connection ids are allocated for connections initiated at our side:
165 166 167
//    there is no conflict in identifiers if one side always allocates them as
//    even (server) and its peer as odd (client).
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
168 169
// Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
170 171 172
//
// Though it is possible to wrap just-established raw connection into NodeLink,
// users should always use Handshake which performs protocol handshaking first.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
173
func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
174
	var nextConnId uint32
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
175 176
	var acceptq chan *Conn
	switch role &^ linkFlagsMask {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
177
	case LinkServer:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
178
		nextConnId = 0             // all initiated by us connId will be even
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179
		acceptq = make(chan *Conn) // accept queue; TODO use backlog?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
180
	case LinkClient:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
181
		nextConnId = 1 // ----//---- odd
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
182
		acceptq = nil  // not accepting incoming connections
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
183 184 185 186
	default:
		panic("invalid conn role")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
187
	nl := &NodeLink{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
188 189 190
		peerLink:   conn,
		connTab:    map[uint32]*Conn{},
		nextConnId: nextConnId,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
191
		acceptq:    acceptq,	// XXX reenable make(chan *Conn), // accepting initially
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
192
		txq:        make(chan txReq),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
193
		down:       make(chan struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
194
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
195
	if role&linkNoRecvSend == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
196 197 198 199
		nl.serveWg.Add(2)
		go nl.serveRecv()
		go nl.serveSend()
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
200
	return nl
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
201 202
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
203 204 205 206 207
// newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn {
	c := &Conn{nodeLink: nl,
		connId: connId,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
208 209 210
		rxq:    make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv
		txerr:  make(chan error, 1),   // NOTE non-blocking - see Conn.Send
		down:   make(chan struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
211 212 213 214 215 216
	}
	nl.connTab[connId] = c
	return c
}

// NewConn creates new connection on top of node-node link
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
217
func (nl *NodeLink) NewConn() (*Conn, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
218 219 220
	nl.connMu.Lock()
	defer nl.connMu.Unlock()
	if nl.connTab == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
221
		if atomic.LoadUint32(&nl.closed) != 0 {
222
			return nil, nl.err("newconn", ErrLinkClosed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
223
		}
224
		return nil, nl.err("newconn", ErrLinkDown)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
225
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240

	// nextConnId could wrap around uint32 limits - find first free slot to
	// not blindly replace existing connection
	for i := uint32(0) ;; i++ {
		_, exists := nl.connTab[nl.nextConnId]
		if !exists {
			break
		}
		nl.nextConnId += 2

		if i > math.MaxUint32 / 2 {
			return nil, nl.err("newconn", ErrLinkManyConn)
		}
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
241 242
	c := nl.newConn(nl.nextConnId)
	nl.nextConnId += 2
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
243

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
244
	return c, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
245 246
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
247
// shutdown closes raw link to peer and marks NodeLink as no longer operational.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
248
// it also shutdowns all opened connections over this node link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
249
func (nl *NodeLink) shutdown() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
250
	nl.downOnce.Do(func() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
251
		close(nl.down)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
252

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
253 254
		// close actual link to peer. this will wakeup {send,recv}Pkt
		// NOTE we need it here so that e.g. aborting on error in serveSend wakes up serveRecv
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
255
		nl.errClose = nl.peerLink.Close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
256

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
257
		nl.downWg.Add(1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
258
		go func() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
259
			defer nl.downWg.Done()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
260

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
261 262 263 264
			// wait for serve{Send,Recv} to complete before shutting connections down
			//
			// we have to do it so that e.g. serveSend has chance
			// to return last error from sendPkt to requester.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
265 266 267 268
			nl.serveWg.Wait()

			nl.connMu.Lock()
			for _, conn := range nl.connTab {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
269
				// NOTE anything waking up on Conn.down must not lock
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
270
				// connMu - else it will deadlock.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
271
				conn.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
272
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
273
			nl.connTab = nil // clear + mark down
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
274 275
			nl.connMu.Unlock()
		}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
276 277 278
	})
}

279
// Close closes node-node link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
280 281
// All blocking operations - Accept and IO on associated connections
// established over node link - are automatically interrupted with an error.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
282
// Underlying raw connection is closed.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
283
// It is safe to call Close several times
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
284
func (nl *NodeLink) Close() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
285
	atomic.StoreUint32(&nl.closed, 1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
286
	nl.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
287
	nl.downWg.Wait()
288
	return nl.err("close", nl.errClose)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
289 290
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
291 292 293 294
// shutdown marks connection as no longer operational
func (c *Conn) shutdown() {
	c.downOnce.Do(func() {
		close(c.down)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
295
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
296 297
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
298 299 300

var connKeepClosed = 1*time.Minute

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
301
// Close closes connection.
302
// Any blocked Send*() or Recv*() will be unblocked and return error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
303 304
//
// NOTE for Send() - once transmission was started - it will complete in the
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
305
// background on the wire not to break node-node link framing.
306
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
307
// It is safe to call Close several times.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
308
func (c *Conn) Close() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
	nl := c.nodeLink

	// adjust nodeLink.connTab
	nl.connMu.Lock()
	if nl.connTab != nil {
		// connection was initiated by us - simply delete - we always
		// know if a packet comes to such connection it is closed.
		if c.connId == nl.nextConnId % 2 {
			delete(nl.connTab, c.connId)

		// connection was initiated by peer which we accepted - put special
		// "closed" connection into connTab entry for some time to reply
		// "connection closed" if another packet comes to it.
		} else {
			cc := nl.newConn(c.connId)
324
			// cc.closed=1 so that cc is not freed by replyNoConn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
325
			// NOTE cc.down stays not closed so Send could work
326 327
			atomic.StoreInt32(&cc.closed, 1)
			cc.errMsg = errConnClosed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
328 329 330 331 332 333 334 335 336
			time.AfterFunc(connKeepClosed, func() {
				nl.connMu.Lock()
				delete(nl.connTab, cc.connId)
				nl.connMu.Unlock()

				cc.shutdown()
			})
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
337
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
338
	nl.connMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
339

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
340
	atomic.StoreInt32(&c.closed, 1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
341
	c.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
342
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
343 344
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
345 346
// ---- receive ----

347
// Accept waits for and accepts incoming connection on top of node-node link
348 349 350 351 352 353 354
func (nl *NodeLink) Accept() (c *Conn, err error) {
	defer func() {
		if err != nil {
			err = nl.err("accept", err)
		}
	}()

355 356 357 358 359 360
	// this node link is not accepting connections
	if nl.acceptq == nil {
		return nil, ErrLinkNoListen
	}

	select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
361
	case <-nl.down:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
362
		if atomic.LoadUint32(&nl.closed) != 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
363
			return nil, ErrLinkClosed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
364
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
365
		return nil, ErrLinkDown
366

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
367
	case c := <-nl.acceptq:
368 369 370 371
		return c, nil
	}
}

372
// errRecvShutdown returns appropriate error when c.down is found ready in recvPkt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
373
func (c *Conn) errRecvShutdown() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
374
	switch {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
375
	case atomic.LoadInt32(&c.closed) != 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
376 377
		return ErrClosedConn

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
378
	case atomic.LoadUint32(&c.nodeLink.closed) != 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
379 380 381
		return ErrLinkClosed

	default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
382 383
		// we have to check what was particular RX error on nodelink shutdown
		// only do that once - after reporting RX error the first time
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
384 385 386 387 388 389 390 391
		// tell client the node link is no longer operational.
		var err error
		c.rxerrOnce.Do(func() {
			c.nodeLink.errMu.Lock()
			err = c.nodeLink.errRecv
			c.nodeLink.errMu.Unlock()
		})
		if err == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
392
			err = ErrLinkDown
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
393 394 395 396 397
		}
		return err
	}
}

398 399
// recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*PktBuf, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
400
	select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
401
	case <-c.down:
402
		return nil, c.err("recv", c.errRecvShutdown())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
403

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
404 405
	case pkt := <-c.rxq:
		return pkt, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
406 407
	}
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
408

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
409
// serveRecv handles incoming packets routing them to either appropriate
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
410 411
// already-established connection or, if node link is accepting incoming
// connections, to new connection put to accept queue.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
412
func (nl *NodeLink) serveRecv() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
413
	defer nl.serveWg.Done()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
414 415
	for {
		// receive 1 packet
416
		// XXX if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
417
		pkt, err := nl.recvPkt()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
418
		//fmt.Printf("recvPkt -> %v, %v\n", pkt, err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
419
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
420
			// on IO error framing over peerLink becomes broken
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
421
			// so we shut down node link and all connections over it.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
422

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
423 424 425
			nl.errMu.Lock()
			nl.errRecv = err
			nl.errMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
426

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
427
			nl.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
428
			return
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
429 430
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
431 432
		// pkt.ConnId -> Conn
		connId := ntoh32(pkt.Header().ConnId)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
433
		accept := false
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
434 435

		nl.connMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
436

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
437
		// connTab is never nil here - because shutdown before
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
438
		// resetting it waits for us to finish.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
439 440
		conn := nl.connTab[connId]

441
		//fmt.Printf("RX .%d -> %v\n", connId, conn)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
442
		if conn == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
443 444
			// "new" connection will be needed in all cases - e.g.
			// temporarily to reply "connection refused"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
445 446
			conn = nl.newConn(connId)

447 448
			//fmt.Printf("connId: %d (%d)\n", connId, connId % 2)
			//fmt.Printf("nextConnId: %d (%d)\n", nl.nextConnId, nl.nextConnId % 2)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
449 450 451 452 453 454 455 456 457 458 459 460 461 462

			// message with connid that should be initiated by us
			if connId % 2 == nl.nextConnId % 2 {
				conn.errMsg = errConnClosed

			// message with connid for a stream initiated by peer
			} else {
				if nl.acceptq == nil {
					conn.errMsg = errConnRefused
				} else {
					// we are accepting new incoming connection
					accept = true
				}

463
				//fmt.Println("ZZZ", conn.errMsg, accept)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
464 465
			}
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
466

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
467 468
		// we are not accepting packet in any way
		if conn.errMsg != nil {
469
			//fmt.Printf(".%d EMSG: %v\n", connId, conn.errMsg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
470
			atomic.AddInt32(&conn.closed, 1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
471
			nl.connMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
472
			go conn.replyNoConn()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
473 474
			continue
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
475

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
476
		// route packet to serving goroutine handler
477 478 479
		//
		// TODO backpressure when Recv is not keeping up with Send on peer side?
		//      (not to let whole nodelink starve because of one connection)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
480 481 482 483
		//
		// NOTE rxq must be buffered with at least 1 element so that
		// queuing pkt succeeds for incoming connection that is not yet
		// there in acceptq.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
484
		conn.rxq <- pkt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
485 486

		// keep connMu locked until here: so that ^^^ `conn.rxq <- pkt` can be
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
487
		// sure conn stays not down e.g. closed by Conn.Close or NodeLink.shutdown
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
488
		//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
489
		// XXX try to release connMu earlier - before `rxq <- pkt`
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
490
		nl.connMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508

		if accept {
			select {
			case <-nl.down:
				// Accept and loop calling it can exit if shutdown was requested
				// if so we are also exiting

				// make sure not to leave rx error as nil
				nl.errMu.Lock()
				nl.errRecv = ErrLinkDown
				nl.errMu.Unlock()

				return

			case nl.acceptq <- conn:
				// ok
			}
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
509 510 511
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
512 513 514 515 516 517
// ---- network replies for closed / refused connections ----

var errConnClosed  = &Error{PROTOCOL_ERROR, "connection closed"}
var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}

// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
518 519 520 521
// and removes connection from nodeLink connTab if ekeep==false.
//func (c *Conn) replyNoConn(e Msg, ekeep bool) {
func (c *Conn) replyNoConn() {
	c.Send(c.errMsg) // ignore errors
522
	//fmt.Println("errsend:", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
523

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
524 525 526 527
	// remove connTab entry - if all users of this temporary conn created
	// only to send the error are now gone.
	c.nodeLink.connMu.Lock()
	if atomic.AddInt32(&c.closed, -1) == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
528 529
		delete(c.nodeLink.connTab, c.connId)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
530
	c.nodeLink.connMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
531 532 533
}

// ---- transmit ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
534

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
535
// txReq is request to transmit a packet. Result error goes back to errch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
536
type txReq struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
537
	pkt   *PktBuf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
538 539 540
	errch chan error
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
541
// errSendShutdown returns appropriate error when c.down is found ready in Send
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
542
func (c *Conn) errSendShutdown() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
543
	switch {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
544
	case atomic.LoadInt32(&c.closed) != 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
545 546 547
		return ErrClosedConn

	// the only other error possible besides Conn being .Close()'ed is that
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
548
	// NodeLink was closed/shutdowned itself - on actual IO problems corresponding
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
549 550
	// error is delivered to particular Send that caused it.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
551
	case atomic.LoadUint32(&c.nodeLink.closed) != 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
552 553 554
		return ErrLinkClosed

	default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
555
		return ErrLinkDown
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
556 557 558
	}
}

559 560 561
// sendPkt sends raw packet via connection
func (c *Conn) sendPkt(pkt *PktBuf) error {
	err := c.sendPkt2(pkt)
562 563 564
	return c.err("send", err)
}

565
func (c *Conn) sendPkt2(pkt *PktBuf) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
566 567 568 569 570
	// set pkt connId associated with this connection
	pkt.Header().ConnId = hton32(c.connId)
	var err error

	select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
571 572
	case <-c.down:
		return c.errSendShutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
573 574 575 576 577 578 579 580

	case c.nodeLink.txq <- txReq{pkt, c.txerr}:
		select {
		// tx request was sent to serveSend and is being transmitted on the wire.
		// the transmission may block for indefinitely long though and
		// we cannot interrupt it as the only way to interrupt is
		// .nodeLink.Close() which will close all other Conns.
		//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
581
		// That's why we are also checking for c.down while waiting
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
582 583 584 585
		// for reply from serveSend (and leave pkt to finish transmitting).
		//
		// NOTE after we return straight here serveSend won't be later
		// blocked on c.txerr<- because that backchannel is a non-blocking one.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
586
		case <-c.down:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
587 588

			// also poll c.txerr here because: when there is TX error,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
589
			// serveSend sends to c.txerr _and_ closes c.down .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
590 591 592
			// We still want to return actual transmission error to caller.
			select {
			case err = <-c.txerr:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
593
				return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
594
			default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
595
				return c.errSendShutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
596 597 598
			}

		case err = <-c.txerr:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
599
			return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
600 601 602 603
		}
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
604 605 606
// serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link.
func (nl *NodeLink) serveSend() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
607
	defer nl.serveWg.Done()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
608 609
	for {
		select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
610
		case <-nl.down:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
611 612 613
			return

		case txreq := <-nl.txq:
614
			// XXX if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
615
			err := nl.sendPkt(txreq.pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
616
			//fmt.Printf("sendPkt -> %v\n", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
617

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
618
			txreq.errch <- err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
619

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
620 621
			// on IO error framing over peerLink becomes broken
			// so we shut down node link and all connections over it.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
622
			if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
623
				nl.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
624 625
				return
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
626 627 628 629
		}
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
630

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
631
// ---- raw IO ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
632

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
633 634
const dumpio = true

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
635 636 637
// sendPkt sends raw packet to peer
// tx error, if any, is returned as is and is analyzed in serveSend
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
638
	if dumpio {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
639
		// XXX -> log
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
640
		fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
641 642
		//defer fmt.Printf("\t-> sendPkt err: %v\n", err)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
643

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
644 645 646 647
	// NOTE Write writes data in full, or it is error
	_, err := nl.peerLink.Write(pkt.Data)
	return err
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
648

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
649
var ErrPktTooBig = errors.New("packet too big")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
650

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
651 652 653 654 655 656 657 658
// recvPkt receives raw packet from peer
// rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*PktBuf, error) {
	// TODO organize rx buffers management (freelist etc)

	// first read to read pkt header and hopefully up to page of data in 1 syscall
	pkt := &PktBuf{make([]byte, 4096)}
	// TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ?
Kirill Smelkov's avatar
Kirill Smelkov committed
659
	//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
660 661 662
	n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen])
	if err != nil {
		return nil, err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
663 664
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
665
	pkth := pkt.Header()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
666

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
667
	// XXX -> better PktHeader.Decode() ?
668 669
	pktLen := PktHeadLen + ntoh32(pkth.MsgLen) // .MsgLen is payload-only length without header
	if pktLen > MAX_PACKET_SIZE {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
670 671
		return nil, ErrPktTooBig
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
672

673 674
	// XXX -> pkt.Data = xbytes.Resize32(pkt.Data[:n], pktLen)
	if pktLen > uint32(cap(pkt.Data)) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
675
		// grow rxbuf
676
		rxbuf2 := make([]byte, pktLen)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
677 678
		copy(rxbuf2, pkt.Data[:n])
		pkt.Data = rxbuf2
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
679
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
680
	// cut .Data len to length of packet
681
	pkt.Data = pkt.Data[:pktLen]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
682

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
683 684 685 686 687 688 689
	// read rest of pkt data, if we need to
	if n < len(pkt.Data) {
		_, err = io.ReadFull(nl.peerLink, pkt.Data[n:])
		if err != nil {
			return nil, err
		}
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
690

691
	if /* XXX temp show only tx */ false && dumpio {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
692
		// XXX -> log
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
693
		fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
694 695 696
	}

	return pkt, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
697 698 699
}


Kirill Smelkov's avatar
Kirill Smelkov committed
700 701
// ---- Handshake ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
702 703 704
// Handshake performs NEO protocol handshake just after raw connection between 2 nodes was established.
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
705
func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
706 707 708 709 710 711
	err = handshake(ctx, conn, PROTOCOL_VERSION)
	if err != nil {
		return nil, err
	}

	// handshake ok -> NodeLink
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
712
	return newNodeLink(conn, role), nil
Kirill Smelkov's avatar
Kirill Smelkov committed
713 714
}

715 716 717
// HandshakeError is returned when there is an error while performing handshake
type HandshakeError struct {
	// XXX just keep .Conn? (but .Conn can be closed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
718 719 720
	LocalAddr  net.Addr
	RemoteAddr net.Addr
	Err        error
721 722 723 724 725 726
}

func (e *HandshakeError) Error() string {
	return fmt.Sprintf("%s - %s: handshake: %s", e.LocalAddr, e.RemoteAddr, e.Err.Error())
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
727
func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
Kirill Smelkov's avatar
Kirill Smelkov committed
728 729
	errch := make(chan error, 2)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
730 731 732
	// tx handshake word
	txWg := sync.WaitGroup{}
	txWg.Add(1)
Kirill Smelkov's avatar
Kirill Smelkov committed
733 734
	go func() {
		var b [4]byte
735
		binary.BigEndian.PutUint32(b[:], version) // XXX -> hton32 ?
Kirill Smelkov's avatar
Kirill Smelkov committed
736 737 738
		_, err := conn.Write(b[:])
		// XXX EOF -> ErrUnexpectedEOF ?
		errch <- err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
739
		txWg.Done()
Kirill Smelkov's avatar
Kirill Smelkov committed
740 741
	}()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
742 743

	// rx handshake word
Kirill Smelkov's avatar
Kirill Smelkov committed
744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
	go func() {
		var b [4]byte
		_, err := io.ReadFull(conn, b[:])
		if err == io.EOF {
			err = io.ErrUnexpectedEOF // can be returned with n = 0
		}
		if err == nil {
			peerVersion := binary.BigEndian.Uint32(b[:]) // XXX -> ntoh32 ?
			if peerVersion != version {
				err = fmt.Errorf("protocol version mismatch: peer = %08x  ; our side = %08x", peerVersion, version)
			}
		}
		errch <- err
	}()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
759 760 761 762 763 764 765 766 767
	connClosed := false
	defer func() {
		// make sure our version is always sent on the wire, if possible,
		// so that peer does not see just closed connection when on rx we see version mismatch
		//
		// NOTE if cancelled tx goroutine will wake up without delay
		txWg.Wait()

		// don't forget to close conn if returning with error + add handshake err context
Kirill Smelkov's avatar
Kirill Smelkov committed
768
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
769 770 771 772 773 774 775 776 777 778 779
			err = &HandshakeError{conn.LocalAddr(), conn.RemoteAddr(), err}
			if !connClosed {
				conn.Close()
			}
		}
	}()


	for i := 0; i < 2; i++ {
		select {
		case <-ctx.Done():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
780
			conn.Close() // interrupt IO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
781 782 783 784 785 786 787
			connClosed = true
			return ctx.Err()

		case err = <-errch:
			if err != nil {
				return err
			}
Kirill Smelkov's avatar
Kirill Smelkov committed
788 789 790
		}
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
791
	// handshaked ok
Kirill Smelkov's avatar
Kirill Smelkov committed
792 793 794
	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
795

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
796
// ---- Dial & Listen at raw NodeLink level ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
797

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
798 799
// DialLink connects to address on given network, handshakes and wraps the connection as NodeLink
func DialLink(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, err error) {
800
	peerConn, err := net.Dial(ctx, addr)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
801 802 803
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
Kirill Smelkov committed
804

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
805
	return Handshake(ctx, peerConn, LinkClient)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
806 807
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
808
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
809
// The listener accepts only those connections that pass handshake.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
810
func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
811 812 813 814 815
	rawl, err := net.Listen(laddr)
	if err != nil {
		return nil, err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
816
	l := &linkListener{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
817
		l:        rawl,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
818
		acceptq:  make(chan linkAccepted),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
819 820 821 822 823 824 825
		closed:   make(chan struct{}),
	}
	go l.run()

	return l, nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
826 827
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept.
type LinkListener interface {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
828 829 830
	// from net.Listener:
	Close() error
	Addr() net.Addr
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
831 832 833 834 835 836 837

	// Accept returns new incoming connection wrapped into NodeLink.
	// It accepts only those connections which pass handshake.
	Accept() (*NodeLink, error)
}

type linkListener struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
838
	l       net.Listener
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
839
	acceptq chan linkAccepted
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
840 841 842
	closed  chan struct {}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
843
type linkAccepted struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
844 845 846 847
	link  *NodeLink
	err   error
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
848
func (l *linkListener) Close() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
849
	err := l.l.Close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
850 851 852 853
	close(l.closed)
	return err
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
854
func (l *linkListener) run() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
855 856 857 858 859 860 861 862 863 864 865 866
	// context that cancels when listener stops
	runCtx, runCancel := context.WithCancel(context.Background())
	defer runCancel()

	for {
		// stop on close
		select {
		case <-l.closed:
			return
		default:
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
867
		// XXX add backpressure on too much incoming connections without client .Accept ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
868
		conn, err := l.l.Accept()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
869 870 871 872
		go l.accept(runCtx, conn, err)
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
873
func (l *linkListener) accept(ctx context.Context, conn net.Conn, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
874 875 876
	link, err := l.accept1(ctx, conn, err)

	select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
877
	case l.acceptq <- linkAccepted{link, err}:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
878 879 880 881 882 883 884
		// ok

	case <-l.closed:
		// shutdown
		if link != nil {
			link.Close()
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
885 886 887
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
888
func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
889 890
	// XXX err ctx?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
891 892 893
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
894

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
895
	// NOTE Handshake closes conn in case of failure
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
896 897 898 899 900 901 902 903
	link, err := Handshake(ctx, conn, LinkServer)
	if err != nil {
		return nil, err
	}

	return link, nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
904
func (l *linkListener) Accept() (*NodeLink, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
905 906 907
	select{
	case <-l.closed:
		// we know raw listener is already closed - return proper error about it
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
908
		_, err := l.l.Accept()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
909 910 911 912 913 914
		return nil, err

	case a := <-l.acceptq:
		return a.link, a.err
	}
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
915

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
916
func (l *linkListener) Addr() net.Addr {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
917 918 919
	return l.l.Addr()
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946
/*
XXX do if this is needed in a second place besides talkMaster1
// ---- Listen for single Conn over NodeLink use-cases ----

// XXX
func ListenSingleConn(link *NodeLink) ConnListener {
	l := &listen1conn{link}
	// XXX go ...
	return l
}

// ConnListener XXX ...
type ConnListener interface {
	// XXX +Close, Addr ?

	// Accept returns new connection multiplexed over NodeLink
	Accept() (*Conn, error)
}

type listen1conn struct {
	link *NodeLink
}

func ...
*/


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969
// ---- for convenience: Conn -> NodeLink & local/remote link addresses  ----

// LocalAddr returns local address of the underlying link to peer.
func (nl *NodeLink) LocalAddr() net.Addr {
	return nl.peerLink.LocalAddr()
}

// RemoteAddr returns remote address of the underlying link to peer.
func (nl *NodeLink) RemoteAddr() net.Addr {
	return nl.peerLink.RemoteAddr()
}

// Link returns underlying NodeLink of this connection.
func (c *Conn) Link() *NodeLink {
	return c.nodeLink
}

// ConnID returns connection identifier used for the connection.
func (c *Conn) ConnID() uint32 {
	return c.connId
}


970
// ---- for convenience: String / Error ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
971
func (nl *NodeLink) String() string {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
972
	s := fmt.Sprintf("%s - %s", nl.LocalAddr(), nl.RemoteAddr())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
973
	return s	// XXX add "(closed)" if nl is closed ?
974
			// XXX other flags e.g. (down) ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
975 976 977 978 979 980 981
}

func (c *Conn) String() string {
	s := fmt.Sprintf("%s .%d", c.nodeLink, c.connId)
	return s	// XXX add "(closed)" if c is closed ?
}

982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002
func (e *LinkError) Error() string {
	return fmt.Sprintf("%s: %s: %s", e.Link, e.Op, e.Err)
}

func (e *ConnError) Error() string {
	return fmt.Sprintf("%s: %s: %s", e.Conn, e.Op, e.Err)
}

func (nl *NodeLink) err(op string, e error) error {
	if e == nil {
		return nil
	}
	return &LinkError{Link: nl, Op: op, Err: e}
}

func (c *Conn) err(op string, e error) error {
	if e == nil {
		return nil
	}
	return &ConnError{Conn: c, Op: op, Err: e}
}
Kirill Smelkov's avatar
Kirill Smelkov committed
1003 1004 1005 1006


// ---- exchange of messages ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1007 1008 1009
//trace:event traceConnRecv(c *Conn, msg Msg)
//trace:event traceConnSendPre(c *Conn, msg Msg)
// XXX do we also need traceConnSend?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1010

Kirill Smelkov's avatar
Kirill Smelkov committed
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
// Recv receives message
// it receives packet and decodes message from it
func (c *Conn) Recv() (Msg, error) {
	// TODO use freelist for PktBuf
	pkt, err := c.recvPkt()
	if err != nil {
		return nil, err
	}

	// decode packet
	pkth := pkt.Header()
	msgCode := ntoh16(pkth.MsgCode)
	msgType := msgTypeRegistry[msgCode]
	if msgType == nil {
		err = fmt.Errorf("invalid msgCode (%d)", msgCode)
		// XXX "decode" -> "recv: decode"?
		return nil, &ConnError{Conn: c, Op: "decode", Err: err}
	}

	// TODO use free-list for decoded messages + when possible decode in-place
	msg := reflect.New(msgType).Interface().(Msg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1032
	_, err = msg.neoMsgDecode(pkt.Payload())
Kirill Smelkov's avatar
Kirill Smelkov committed
1033 1034 1035 1036
	if err != nil {
		return nil, &ConnError{Conn: c, Op: "decode", Err: err}
	}

1037
	traceConnRecv(c, msg)
Kirill Smelkov's avatar
Kirill Smelkov committed
1038 1039 1040 1041 1042 1043
	return msg, nil
}

// Send sends message
// it encodes message into packet and sends it
func (c *Conn) Send(msg Msg) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1044
	traceConnSendPre(c, msg)
1045

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1046
	l := msg.neoMsgEncodedLen()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1047
	buf := PktBuf{make([]byte, PktHeadLen+l)} // TODO -> freelist
Kirill Smelkov's avatar
Kirill Smelkov committed
1048 1049 1050

	h := buf.Header()
	// h.ConnId will be set by conn.Send
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1051
	h.MsgCode = hton16(msg.neoMsgCode())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1052
	h.MsgLen = hton32(uint32(l)) // XXX casting: think again
Kirill Smelkov's avatar
Kirill Smelkov committed
1053

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1054
	msg.neoMsgEncode(buf.Payload())
Kirill Smelkov's avatar
Kirill Smelkov committed
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079

	// XXX why pointer?
	// XXX more context in err? (msg type)
	return c.sendPkt(&buf)
}


// Expect receives message and checks it is one of expected types
//
// if verification is successful the message is decoded inplace and returned
// which indicates index of received message.
//
// on error (-1, err) is returned
func (c *Conn) Expect(msgv ...Msg) (which int, err error) {
	// XXX a bit dup wrt Recv
	// TODO use freelist for PktBuf
	pkt, err := c.recvPkt()
	if err != nil {
		return -1, err
	}

	pkth := pkt.Header()
	msgCode := ntoh16(pkth.MsgCode)

	for i, msg := range msgv {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1080 1081
		if msg.neoMsgCode() == msgCode {
			_, err = msg.neoMsgDecode(pkt.Payload())
Kirill Smelkov's avatar
Kirill Smelkov committed
1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
			if err != nil {
				return -1, &ConnError{Conn: c, Op: "decode", Err: err}
			}
			return i, nil
		}
	}

	// unexpected message
	msgType := msgTypeRegistry[msgCode]
	if msgType == nil {
		return -1, &ConnError{c, "decode", fmt.Errorf("invalid msgCode (%d)", msgCode)}
	}

	// XXX also add which messages were expected ?
	return -1, &ConnError{c, "recv", fmt.Errorf("unexpected message: %v", msgType)}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1099
// Ask sends request and receives response.
Kirill Smelkov's avatar
Kirill Smelkov committed
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
// It expects response to be exactly of resp type and errors otherwise
// XXX clarify error semantic (when Error is decoded)
// XXX do the same as Expect wrt respv ?
func (c *Conn) Ask(req Msg, resp Msg) error {
	err := c.Send(req)
	if err != nil {
		return err
	}

	nerr := &Error{}
	which, err := c.Expect(resp, nerr)
	switch which {
	case 0:
		return nil
	case 1:
		return ErrDecode(nerr)
	}

	return err
}