Commit 235c7466 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9c1443ba
......@@ -24,6 +24,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"fmt"
//"lab.nexedi.com/kirr/go123/xruntime/debug"
......@@ -63,12 +64,17 @@ type NodeLink struct {
errMu sync.Mutex
// errSend error // error got from sendPkt, if any
errRecv error // error got from recvPkt, if any
errClose error // error got from peerLink.Close
// once because: NodeLink has to be explicitly closed by user; it can also
// be "closed" by IO errors on peerLink
closeOnce sync.Once
closed chan struct{} // XXX text
closeCalled uint32 // whether Close was called; ^^^ can be from IO error
closeWg sync.WaitGroup // XXX for close waiter
errClose error // error got from peerLink.Close
}
// Conn is a connection established over NodeLink
......@@ -83,14 +89,18 @@ type Conn struct {
rxq chan *PktBuf // received packets for this Conn go here
txerr chan error // transmit errors for this Conn go back here
closed chan struct{} // whether Conn is marked as no longer operational
closeCalled uint32 // whether Close was called; ^^^ can be from IO error on node link
rxerrOnce sync.Once // XXX whether actual RX error was already reported to caller
// once because: Conn has to be explicitly closed by user; it can also
// be closed by NodeLink.Close .
closeOnce sync.Once
closed chan struct{}
}
// ErrLinkClosed is the error indicated for operations on closed NodeLink
var ErrLinkClosed = errors.New("node link is closed") // XXX -> read/write but also Accept ?
var ErrLinkStopped = errors.New("node link was stopped") // XXX due to IO errors?
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
var ErrClosedConn = errors.New("read/write on closed connection")
......@@ -176,9 +186,22 @@ func (nl *NodeLink) NewConn() *Conn {
}
// close is worker for Close & friends.
// It marks all active Conns and NodeLink itself as closed.
// It marks NodeLink and all active Conns as closed.
func (nl *NodeLink) close() {
nl.closeOnce.Do(func() {
close(nl.closed)
// close actual link to peer. this will wakeup serve{Send,Recv}
// NOTE we need it here so that e.g. aborting on error serveSend wakes up serveRecv
nl.errClose = nl.peerLink.Close()
nl.closeWg.Add(1)
go func() {
defer nl.closeWg.Done()
// wait for serve{Send,Recv} to complete before signalling to Conns
nl.serveWg.Wait()
nl.connMu.Lock()
for _, conn := range nl.connTab {
// NOTE anything waking up on Conn.closed must not lock
......@@ -187,24 +210,16 @@ func (nl *NodeLink) close() {
}
nl.connTab = nil // clear + mark closed
nl.connMu.Unlock()
close(nl.closed)
// close actual link to peer. this will wakeup serve{Send,Recv}
// NOTE we need it here so that e.g. aborting on error serveSend wakes up serveRecv
nl.errMu.Lock()
nl.errClose = nl.peerLink.Close()
nl.errMu.Unlock()
}()
})
}
// Close closes node-node link.
// IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error {
atomic.StoreUint32(&nl.closeCalled, 1)
nl.close()
// wait for serve{Send,Recv} to complete
nl.serveWg.Wait()
nl.closeWg.Wait() // wait for close to complete
return nl.errClose
}
......@@ -225,6 +240,8 @@ func (c *Conn) Close() error {
c.nodeLink.connMu.Lock()
delete(c.nodeLink.connTab, c.connId)
c.nodeLink.connMu.Unlock()
atomic.StoreUint32(&c.closeCalled, 1)
c.close()
return nil
}
......@@ -245,13 +262,37 @@ func (nl *NodeLink) Accept() (*Conn, error) {
}
}
// errRecvClosed returns appropriate error when c.closed is found ready in Recv
func (c *Conn) errRecvClosed() error {
switch {
case atomic.LoadUint32(&c.closeCalled) != 0:
return ErrClosedConn
case atomic.LoadUint32(&c.nodeLink.closeCalled) != 0:
return ErrLinkClosed
default:
// we have to check what was particular RX error on nodelink
// only do that once - after reportin RX error the first time
// tell client the node link is no longer operational.
var err error
c.rxerrOnce.Do(func() {
c.nodeLink.errMu.Lock()
err = c.nodeLink.errRecv
c.nodeLink.errMu.Unlock()
})
if err == nil {
err = ErrLinkStopped
}
return err
}
}
// Recv receives packet from connection
func (c *Conn) Recv() (*PktBuf, error) {
select {
case <-c.closed:
// XXX get err from c.nodeLink.recvErr
// XXX if nil -> ErrClosedConn ?
return nil, ErrClosedConn // XXX -> EOF ?
return nil, c.errRecvClosed()
case pkt := <-c.rxq: // XXX try to leave only pkt, ok := <-c.rxq
return pkt, nil // XXX error = ?
......@@ -271,12 +312,14 @@ func (nl *NodeLink) serveRecv() {
// on IO error framing over peerLink becomes broken
// so we mark node link and all connections as closed and stop service
/*
select {
case <-nl.closed:
// error was due to closing NodeLink
err = ErrLinkClosed
default:
}
*/
nl.errMu.Lock()
nl.errRecv = err
......@@ -329,6 +372,24 @@ type txReq struct {
errch chan error
}
// errSendClosed returns approproate error when c.closed is found ready in Send
func (c *Conn) errSendClosed() error {
switch {
case atomic.LoadUint32(&c.closeCalled) != 0:
return ErrClosedConn
// the only other error possible besides Conn being .Close()'ed is that
// NodeLink was closed/stopped itself - on actual IO problems corresponding
// error is delivered to particular Send that caused it.
case atomic.LoadUint32(&c.nodeLink.closeCalled) != 0:
return ErrLinkClosed
default:
return ErrLinkStopped
}
}
// Send sends packet via connection
func (c *Conn) Send(pkt *PktBuf) error {
// set pkt connId associated with this connection
......@@ -337,8 +398,7 @@ func (c *Conn) Send(pkt *PktBuf) error {
select {
case <-c.closed:
return ErrClosedConn
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ?
return c.errSendClosed()
case c.nodeLink.txq <- txReq{pkt, c.txerr}:
select {
......@@ -359,15 +419,14 @@ func (c *Conn) Send(pkt *PktBuf) error {
// We still want to return actual transmission error to caller.
select {
case err = <-c.txerr:
return err
return err // XXX if nil ?
default:
return ErrClosedConn
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ?
return c.errSendClosed()
}
case err = <-c.txerr:
//fmt.Printf("%v <- c.txerr\n", err)
return err
return err // XXX if nil ?
}
}
......@@ -378,21 +437,21 @@ func (c *Conn) Send(pkt *PktBuf) error {
// serially executes them over associated node link.
func (nl *NodeLink) serveSend() {
defer nl.serveWg.Done()
var err error
for {
select {
case <-nl.closed:
return
case txreq := <-nl.txq:
err = nl.sendPkt(txreq.pkt)
err := nl.sendPkt(txreq.pkt)
fmt.Printf("sendPkt -> %v\n", err)
if err != nil {
// on IO error framing over peerLink becomes broken
// so mark node link and all connections as closed and stop service
/*
if err != nil {
select {
case <-nl.closed:
// error was due to closing NodeLink
......@@ -400,6 +459,7 @@ func (nl *NodeLink) serveSend() {
default:
}
}
*/
txreq.errch <- err // XXX recheck wakeup logic for err case
......
......@@ -325,40 +325,89 @@ func TestNodeLink(t *testing.T) {
xclose(nl2)
*/
// NodeLink.Close vs Conn.Send/Recv on another side TODO
// NodeLink.Close vs Conn.Send/Recv on another side
nl1, nl2 := _nodeLinkPipe(0, linkNoRecvSend)
c11 := nl1.NewConn()
c12 := nl1.NewConn()
c13 := nl1.NewConn()
wg := WorkGroup()
var errRecv error
wg.Gox(func() {
println(">>> RECV START")
pkt, err := c11.Recv()
println(">>> recv wakeup")
if !(pkt == nil && err == ErrClosedConn) { // XXX -> EOF ?
want1 := io.EOF // if recvPkt wakes up due to peer close
want2 := io.ErrClosedPipe // if recvPkt wakes up due to sendPkt wakes up first and closes nl1
if !(pkt == nil && (err == want1 || err == want2)) {
exc.Raisef("Conn.Recv after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
}
println("recv ok")
errRecv = err
})
wg.Gox(func() {
pkt := &PktBuf{[]byte("data")}
println(">>> SEND START")
err := c12.Send(pkt)
println(">>> send wakeup")
if want := io.ErrClosedPipe; err != want {// XXX we are here but what the error should be?
exc.Raisef("Conn.Send() after peer NodeLink shutdown: unexpected err\nhave: %v\nwant: %v", err, want)
want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl1
if err != want {
exc.Raisef("Conn.Send after peer NodeLink shutdown: %v", err)
}
println(">>> SEND OK")
})
tdelay()
println("NL2.Close")
xclose(nl2)
xwait(wg)
// TODO check Recv/Send error on second call
// Recv/Send on another Conn
pkt, err := c13.Recv()
if !(pkt == nil && err == errRecv) {
t.Fatalf("Conn.Recv 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
}
err = c13.Send(&PktBuf{[]byte("data")})
if err != ErrLinkStopped {
t.Fatalf("Conn.Send 2 after peer NodeLink shutdown: %v", err)
}
// Recv/Send error on second call
pkt, err = c11.Recv()
if !(pkt == nil && err == ErrLinkStopped) {
t.Fatalf("Conn.Recv after NodeLink stop: pkt = %v err = %v", pkt, err)
}
err = c12.Send(&PktBuf{[]byte("data")})
if err != ErrLinkStopped {
t.Fatalf("Conn.Send after NodeLink stop: %v", err)
}
xclose(c13)
// Recv/Send on closed Conn but not closed NodeLink
pkt, err = c13.Recv()
if !(pkt == nil && err == ErrClosedConn) {
t.Fatalf("Conn.Recv after close but only stopped NodeLink: pkt = %v err = %v", pkt, err)
}
err = c13.Send(&PktBuf{[]byte("data")})
if err != ErrClosedConn {
t.Fatalf("Conn.Send after close but only stopped NodeLink: %v", err)
}
xclose(nl1)
// Recv/Send error after NodeLink close
pkt, err = c11.Recv()
if !(pkt == nil && err == ErrLinkClosed) {
t.Fatalf("Conn.Recv after NodeLink stop: pkt = %v err = %v", pkt, err)
}
err = c12.Send(&PktBuf{[]byte("data")})
if err != ErrLinkClosed {
t.Fatalf("Conn.Send after NodeLink stop: %v", err)
}
xclose(c11)
xclose(c12)
// TODO check Recv/Send error after Close
xclose(nl1)
// check Recv/Send error after Close & NodeLink shutdown
pkt, err = c11.Recv()
if !(pkt == nil && err == ErrClosedConn) {
t.Fatalf("Conn.Recv after close and NodeLink close: pkt = %v err = %v", pkt, err)
}
err = c12.Send(&PktBuf{[]byte("data")})
if err != ErrClosedConn {
t.Fatalf("Conn.Send after close and NodeLink close: %v", err)
}
/*
// Conn accept + exchange
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment