Commit deaca577 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bc8bd0e0
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
// exchange NEO protocol messages. // exchange NEO protocol messages.
// //
// New connections can be created with link.NewConn(). Once connection is // New connections can be created with link.NewConn(). Once connection is
// created and message is sent over it, on peer's side another corresponding // created and a message is sent over it, on peer's side another corresponding
// new connection can be accepted via link.Accept(), and all further communication // new connection can be accepted via link.Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections. // send/receive exchange will be happening in between those 2 connections.
// //
...@@ -43,6 +43,24 @@ ...@@ -43,6 +43,24 @@
// XXX document // XXX document
package neonet package neonet
// XXX neonet compatibility with NEO/py depends on the following small NEO/py patch:
//
// https://lab.nexedi.com/kirr/neo/commit/dd3bb8b4
//
// which adjusts message ID a bit so it behaves like stream_id in HTTP/2:
//
// - always even for server initiated streams
// - always odd for client initiated streams
//
// and is incremented by += 2, instead of += 1 to maintain above invariant.
//
// See http://navytux.spb.ru/~kirr/neo.html#development-overview (starting from
// "Then comes the link layer which provides service to exchange messages over
// network...") for the rationale.
//
// Unfortunately current NEO/py maintainer is very much against merging that patch.
//go:generate gotrace gen . //go:generate gotrace gen .
import ( import (
...@@ -163,7 +181,6 @@ type Conn struct { ...@@ -163,7 +181,6 @@ type Conn struct {
txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown
txclosed atomic32 // whether CloseSend was called txclosed atomic32 // whether CloseSend was called
// closing Conn is shutdown + some cleanup work to remove it from // closing Conn is shutdown + some cleanup work to remove it from
// link.connTab including arming timers etc. Let this work be spawned only once. // link.connTab including arming timers etc. Let this work be spawned only once.
// (for Conn.Close to be valid called several times) // (for Conn.Close to be valid called several times)
...@@ -176,18 +193,14 @@ var ErrLinkNoListen = errors.New("node link is not listening for incoming connec ...@@ -176,18 +193,14 @@ var ErrLinkNoListen = errors.New("node link is not listening for incoming connec
var ErrLinkManyConn = errors.New("too many opened connections") var ErrLinkManyConn = errors.New("too many opened connections")
var ErrClosedConn = errors.New("connection is closed") var ErrClosedConn = errors.New("connection is closed")
// LinkError is returned by NodeLink operations // LinkError is returned by NodeLink operations.
//
// XXX -> when error scope is whole link ?
type LinkError struct { type LinkError struct {
Link *NodeLink Link *NodeLink
Op string Op string
Err error Err error
} }
// ConnError is returned by Conn operations // ConnError is returned by Conn operations.
//
// XXX -> when error scope is connection ?
type ConnError struct { type ConnError struct {
Link *NodeLink Link *NodeLink
ConnId uint32 // NOTE Conn's are reused - cannot use *Conn here ConnId uint32 // NOTE Conn's are reused - cannot use *Conn here
...@@ -201,14 +214,14 @@ type ConnError struct { ...@@ -201,14 +214,14 @@ type ConnError struct {
type _LinkRole int type _LinkRole int
const ( const (
_LinkServer _LinkRole = iota // link created as server _LinkServer _LinkRole = iota // link created as server
_LinkClient // link created as client _LinkClient // link created as client
// for testing: // for testing:
linkNoRecvSend _LinkRole = 1 << 16 // do not spawn serveRecv & serveSend linkNoRecvSend _LinkRole = 1 << 16 // do not spawn serveRecv & serveSend
linkFlagsMask _LinkRole = (1<<32 - 1) << 16 linkFlagsMask _LinkRole = (1<<32 - 1) << 16
) )
// newNodeLink makes a new NodeLink from already established net.Conn // newNodeLink makes a new NodeLink from already established net.Conn .
// //
// Role specifies how to treat our role on the link - either as client or // Role specifies how to treat our role on the link - either as client or
// server. The difference in between client and server roles is in: // server. The difference in between client and server roles is in:
...@@ -251,7 +264,7 @@ func newNodeLink(conn net.Conn, role _LinkRole) *NodeLink { ...@@ -251,7 +264,7 @@ func newNodeLink(conn net.Conn, role _LinkRole) *NodeLink {
return nl return nl
} }
// connPool is freelist for Conn // connPool is freelist for Conn.
// XXX make it per-link? // XXX make it per-link?
var connPool = sync.Pool{New: func() interface{} { var connPool = sync.Pool{New: func() interface{} {
return &Conn{ return &Conn{
...@@ -262,7 +275,7 @@ var connPool = sync.Pool{New: func() interface{} { ...@@ -262,7 +275,7 @@ var connPool = sync.Pool{New: func() interface{} {
} }
}} }}
// connAlloc allocates Conn from freelist // connAlloc allocates Conn from freelist.
func (link *NodeLink) connAlloc(connId uint32) *Conn { func (link *NodeLink) connAlloc(connId uint32) *Conn {
c := connPool.Get().(*Conn) c := connPool.Get().(*Conn)
c.reinit() c.reinit()
...@@ -271,13 +284,13 @@ func (link *NodeLink) connAlloc(connId uint32) *Conn { ...@@ -271,13 +284,13 @@ func (link *NodeLink) connAlloc(connId uint32) *Conn {
return c return c
} }
// release releases connection to freelist // release releases connection to freelist.
func (c *Conn) release() { func (c *Conn) release() {
c.reinit() // XXX just in case c.reinit() // XXX just in case
connPool.Put(c) connPool.Put(c)
} }
// reinit reinitializes connection after reallocating it from freelist // reinit reinitializes connection after reallocating it from freelist.
func (c *Conn) reinit() { func (c *Conn) reinit() {
c.link = nil c.link = nil
c.connId = 0 c.connId = 0
...@@ -315,7 +328,7 @@ func ensureOpen(ch *chan struct{}) { ...@@ -315,7 +328,7 @@ func ensureOpen(ch *chan struct{}) {
} }
// newConn creates new Conn with id=connId and registers it into connTab. // newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held. // must be called with connMu held.
func (link *NodeLink) newConn(connId uint32) *Conn { func (link *NodeLink) newConn(connId uint32) *Conn {
c := link.connAlloc(connId) c := link.connAlloc(connId)
link.connTab[connId] = c link.connTab[connId] = c
...@@ -369,7 +382,6 @@ func (link *NodeLink) shutdownAX() { ...@@ -369,7 +382,6 @@ func (link *NodeLink) shutdownAX() {
// drain all connections from .acceptq: // drain all connections from .acceptq:
// - something could be already buffered there // - something could be already buffered there
// - serveRecv could start writing acceptq at the same time we set axdownFlag; we derace it // - serveRecv could start writing acceptq at the same time we set axdownFlag; we derace it
for { for {
// if serveRecv is outside `.acceptq <- ...` critical // if serveRecv is outside `.acceptq <- ...` critical
// region and fully drained - we are done. // region and fully drained - we are done.
...@@ -413,6 +425,7 @@ func (link *NodeLink) shutdownAX() { ...@@ -413,6 +425,7 @@ func (link *NodeLink) shutdownAX() {
} }
// shutdown closes raw link to peer and marks NodeLink as no longer operational. // shutdown closes raw link to peer and marks NodeLink as no longer operational.
//
// it also shutdowns all opened connections over this node link. // it also shutdowns all opened connections over this node link.
func (nl *NodeLink) shutdown() { func (nl *NodeLink) shutdown() {
nl.shutdownAX() nl.shutdownAX()
...@@ -479,14 +492,14 @@ func (c *Conn) shutdown() { ...@@ -479,14 +492,14 @@ func (c *Conn) shutdown() {
c.shutdownRX(errConnClosed) c.shutdownRX(errConnClosed)
} }
// shutdownTX marks TX as no longer operational (?) and interrupts Send. // shutdownTX marks TX as no longer operational and interrupts Send.
func (c *Conn) shutdownTX() { func (c *Conn) shutdownTX() {
c.txdownOnce.Do(func() { c.txdownOnce.Do(func() {
close(c.txdown) close(c.txdown)
}) })
} }
// shutdownRX marks .rxq as no loner operational and interrupts Recv. // shutdownRX marks .rxq as no longer operational and interrupts Recv.
func (c *Conn) shutdownRX(errMsg *proto.Error) { func (c *Conn) shutdownRX(errMsg *proto.Error) {
c.rxdownOnce.Do(func() { c.rxdownOnce.Do(func() {
// close(c.rxdown) // wakeup Conn.Recv // close(c.rxdown) // wakeup Conn.Recv
...@@ -629,7 +642,7 @@ func (c *Conn) Close() error { ...@@ -629,7 +642,7 @@ func (c *Conn) Close() error {
// ---- receive ---- // ---- receive ----
// errAcceptShutdownAX returns appropriate error when link.axdown is found ready in Accept // errAcceptShutdownAX returns appropriate error when link.axdown is found ready in Accept.
func (link *NodeLink) errAcceptShutdownAX() error { func (link *NodeLink) errAcceptShutdownAX() error {
switch { switch {
case link.closed.Get() != 0: case link.closed.Get() != 0:
...@@ -639,7 +652,6 @@ func (link *NodeLink) errAcceptShutdownAX() error { ...@@ -639,7 +652,6 @@ func (link *NodeLink) errAcceptShutdownAX() error {
return ErrLinkNoListen return ErrLinkNoListen
default: default:
// XXX ok? - recheck
return ErrLinkDown return ErrLinkDown
} }
} }
...@@ -677,7 +689,7 @@ func (link *NodeLink) Accept() (*Conn, error) { ...@@ -677,7 +689,7 @@ func (link *NodeLink) Accept() (*Conn, error) {
return conn, err return conn, err
} }
// errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt // errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt.
func (c *Conn) errRecvShutdown() error { func (c *Conn) errRecvShutdown() error {
switch { switch {
case c.rxclosed.Get() != 0: case c.rxclosed.Get() != 0:
...@@ -755,7 +767,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -755,7 +767,7 @@ func (nl *NodeLink) serveRecv() {
defer nl.serveWg.Done() defer nl.serveWg.Done()
for { for {
// receive 1 packet // receive 1 packet
// XXX if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing // NOTE if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing
pkt, err := nl.recvPkt() pkt, err := nl.recvPkt()
//fmt.Printf("\n%p recvPkt -> %v, %v\n", nl, pkt, err) //fmt.Printf("\n%p recvPkt -> %v, %v\n", nl, pkt, err)
if err != nil { if err != nil {
...@@ -776,8 +788,8 @@ func (nl *NodeLink) serveRecv() { ...@@ -776,8 +788,8 @@ func (nl *NodeLink) serveRecv() {
nl.connMu.Lock() nl.connMu.Lock()
// connTab is never nil here - because shutdown before // connTab is never nil here - because shutdown, before
// resetting it waits for us to finish. // resetting it, waits for us to finish.
conn := nl.connTab[connId] conn := nl.connTab[connId]
if conn == nil { if conn == nil {
...@@ -819,7 +831,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -819,7 +831,7 @@ func (nl *NodeLink) serveRecv() {
//fmt.Printf("%p\tconn.rxdown: %v\taccept: %v\n", nl, rxdown, accept) //fmt.Printf("%p\tconn.rxdown: %v\taccept: %v\n", nl, rxdown, accept)
// conn exists but rx is down - "connection closed" // conn exists, but rx is down - "connection closed"
// (this cannot happen for newly accepted connection) // (this cannot happen for newly accepted connection)
if rxdown { if rxdown {
go nl.replyNoConn(connId, errConnClosed) go nl.replyNoConn(connId, errConnClosed)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment