Commit 98ff316b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0b18819a
...@@ -51,28 +51,26 @@ import ( ...@@ -51,28 +51,26 @@ import (
type NodeLink struct { type NodeLink struct {
peerLink net.Conn // raw conn to peer peerLink net.Conn // raw conn to peer
connMu sync.Mutex // TODO -> RW ? connMu sync.Mutex
connTab map[uint32]*Conn // connId -> Conn associated with connId connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
acceptq chan *Conn // queue of incoming connections for Accept acceptq chan *Conn // queue of incoming connections for Accept
// = nil if NodeLink is not accepting connections // = nil if NodeLink is not accepting connections
txq chan txReq // tx requests from Conns go via here txq chan txReq // tx requests from Conns go via here
// (rx packets are routed to Conn.rxq)
down chan struct{} // ready when NodeLink is marked as no longer operational
downOnce sync.Once // shutdown may be due both Close and IO error
downWg sync.WaitGroup // for activities at shutdown
errClose error // error got from peerLink.Close on shutdown
errMu sync.Mutex errMu sync.Mutex
errRecv error // error got from recvPkt on shutdown errRecv error // error got from recvPkt on shutdown
// once because: NodeLink has to be explicitly closed by user; it can also closeCalled uint32 // whether Close was called
// be "closed" by IO errors on peerLink
closeOnce sync.Once
down chan struct{} // ready when NodeLink is marked as no longer operational
closeCalled uint32 // whether Close was called; ^^^ can be from IO error
shutdownWg sync.WaitGroup // for activities at shutdown
errClose error // error got from peerLink.Close
} }
...@@ -188,16 +186,16 @@ func (nl *NodeLink) NewConn() (*Conn, error) { ...@@ -188,16 +186,16 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
// shutdown closes peerLink and marks NodeLink as no longer operational // shutdown closes peerLink and marks NodeLink as no longer operational
// it also shutdowns and all active Conns. // it also shutdowns and all active Conns.
func (nl *NodeLink) shutdown() { func (nl *NodeLink) shutdown() {
nl.closeOnce.Do(func() { nl.downOnce.Do(func() {
close(nl.down) close(nl.down)
// close actual link to peer. this will wakeup {send,recv}Pkt // close actual link to peer. this will wakeup {send,recv}Pkt
// NOTE we need it here so that e.g. aborting on error in serveSend wakes up serveRecv // NOTE we need it here so that e.g. aborting on error in serveSend wakes up serveRecv
nl.errClose = nl.peerLink.Close() nl.errClose = nl.peerLink.Close()
nl.shutdownWg.Add(1) nl.downWg.Add(1)
go func() { go func() {
defer nl.shutdownWg.Done() defer nl.downWg.Done()
// wait for serve{Send,Recv} to complete before shutting connections down // wait for serve{Send,Recv} to complete before shutting connections down
// //
...@@ -207,11 +205,11 @@ func (nl *NodeLink) shutdown() { ...@@ -207,11 +205,11 @@ func (nl *NodeLink) shutdown() {
nl.connMu.Lock() nl.connMu.Lock()
for _, conn := range nl.connTab { for _, conn := range nl.connTab {
// NOTE anything waking up on Conn.closed must not lock // NOTE anything waking up on Conn.down must not lock
// connMu - else it will deadlock. // connMu - else it will deadlock.
conn.shutdown() conn.shutdown()
} }
nl.connTab = nil // clear + mark closed nl.connTab = nil // clear + mark down
nl.connMu.Unlock() nl.connMu.Unlock()
}() }()
}) })
...@@ -222,7 +220,7 @@ func (nl *NodeLink) shutdown() { ...@@ -222,7 +220,7 @@ func (nl *NodeLink) shutdown() {
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
atomic.StoreUint32(&nl.closeCalled, 1) atomic.StoreUint32(&nl.closeCalled, 1)
nl.shutdown() nl.shutdown()
nl.shutdownWg.Wait() nl.downWg.Wait()
return nl.errClose return nl.errClose
} }
...@@ -268,7 +266,7 @@ func (nl *NodeLink) Accept() (*Conn, error) { ...@@ -268,7 +266,7 @@ func (nl *NodeLink) Accept() (*Conn, error) {
} }
} }
// errRecvShutdown returns appropriate error when c.closed is found ready in Recv // errRecvShutdown returns appropriate error when c.down is found ready in Recv
func (c *Conn) errRecvShutdown() error { func (c *Conn) errRecvShutdown() error {
switch { switch {
case atomic.LoadUint32(&c.closeCalled) != 0: case atomic.LoadUint32(&c.closeCalled) != 0:
...@@ -335,8 +333,8 @@ func (nl *NodeLink) serveRecv() { ...@@ -335,8 +333,8 @@ func (nl *NodeLink) serveRecv() {
if nl.acceptq != nil { if nl.acceptq != nil {
// we are accepting new incoming connection // we are accepting new incoming connection
conn = nl.newConn(connId) conn = nl.newConn(connId)
// XXX what if Accept exited because of just recently close(nl.closed)? // XXX what if Accept exited because of just recently close(nl.down)?
// -> check nl.closed here too ? // -> check nl.down here too ?
nl.acceptq <- conn nl.acceptq <- conn
} }
} }
...@@ -349,12 +347,12 @@ func (nl *NodeLink) serveRecv() { ...@@ -349,12 +347,12 @@ func (nl *NodeLink) serveRecv() {
} }
// route packet to serving goroutine handler // route packet to serving goroutine handler
// XXX what if Conn.Recv exited because of just recently close(nl.closed) ? // XXX what if Conn.Recv exited because of just recently close(nl.down) ?
// -> check nl.closed here too ? // -> check nl.down here too ?
conn.rxq <- pkt conn.rxq <- pkt
// keep connMu locked until here: so that ^^^ `conn.rxq <- pkt` can be // keep connMu locked until here: so that ^^^ `conn.rxq <- pkt` can be
// sure conn stays not closed e.g. by Conn.Close // sure conn stays not down e.g. closed by Conn.Close
nl.connMu.Unlock() nl.connMu.Unlock()
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment