Commit 94001fa9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent de551085
......@@ -21,10 +21,8 @@ import (
"sync"
"unsafe"
"fmt"
//"fmt"
)
// XXX temp
var _ = fmt.Println
// NodeLink is a node-node link in NEO
//
......@@ -54,8 +52,8 @@ type NodeLink struct {
connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us
serveWg sync.WaitGroup
serveWg sync.WaitGroup // for serve{Send,Recv}
//handleWg sync.WaitGroup // for spawned handlers XXX do we need this + .Wait() ?
handleNewConn func(conn *Conn) // handler for new connections
txreq chan txReq // tx requests from Conns go via here
......@@ -280,8 +278,6 @@ func (nl *NodeLink) serveRecv() {
if handleNewConn != nil {
conn = nl.newConn(connId)
}
// FIXME update connTab with born conn
}
nl.connMu.Unlock()
......@@ -422,7 +418,7 @@ func (c *Conn) close() {
// Close connection
// Any blocked Send() or Recv() will be unblocked and return error
// XXX Send() - if started - will first complete (not to break framing) XXX <- in background
func (c *Conn) Close() error { // XXX do we need error here?
func (c *Conn) Close() error {
// adjust nodeLink.connTab
c.nodeLink.connMu.Lock()
delete(c.nodeLink.connTab, c.connId)
......
......@@ -260,10 +260,11 @@ func TestNodeLink(t *testing.T) {
xwait(wg)
xclose(c11)
xclose(c12)
xclose(nl2) // for completeness
xclose(nl2)
// Conn accept + exchange
nl1, nl2 = nodeLinkPipe()
hdone := make(chan struct{})
nl2.HandleNewConn(func(c *Conn) {
// TODO raised err -> errch
pkt := xrecv(c)
......@@ -271,56 +272,74 @@ func TestNodeLink(t *testing.T) {
// change pkt a bit and send it back
xsend(c, mkpkt(34, []byte("pong")))
// one more time
pkt = xrecv(c)
xverifyPkt(pkt, c.connId, 35, []byte("ping2"))
xsend(c, mkpkt(36, []byte("pong2")))
xclose(c)
close(hdone)
})
c1 := nl1.NewConn()
pkt = mkpkt(33, []byte("ping"))
xsend(c1, pkt)
pkt2 := xrecv(c1)
xverifyPkt(pkt2, c1.connId, 34, []byte("pong"))
c = nl1.NewConn()
xsend(c, mkpkt(33, []byte("ping")))
pkt = xrecv(c)
xverifyPkt(pkt, c.connId, 34, []byte("pong"))
xsend(c, mkpkt(35, []byte("ping2")))
pkt = xrecv(c)
xverifyPkt(pkt, c.connId, 36, []byte("pong2"))
<-hdone
xclose(c)
xclose(nl1)
xclose(nl2)
// test 2 channels with replies comming in reversed time order
nl1, nl2 = nodeLinkPipe()
order := map[uint16]struct { // "order" in which to process requests
start chan int // processing starts when start chan is ready
next uint16 // after processing this switch to next
replyOrder := map[uint16]struct { // "order" in which to process requests
start chan struct{} // processing starts when start chan is ready
next uint16 // after processing this switch to next
}{
2: {make(chan int), 1},
1: {make(chan int), 0},
2: {make(chan struct{}), 1},
1: {make(chan struct{}), 0},
}
go func() {
order[2].start <- 0
}()
c1 = nl1.NewConn() // XXX temp?
c2 := nl1.NewConn()
close(replyOrder[2].start)
nl2.HandleNewConn(func(c *Conn) {
pkt := xrecv(c)
n := ntoh16(pkt.Header().MsgCode)
x := order[n]
x := replyOrder[n]
// wait before it is our turn & echo pkt back
<-x.start
xsend(c, pkt)
xclose(c)
// tell next it can start
if x.next != 0 {
order[x.next].start <- 0
close(replyOrder[x.next].start)
}
})
c1 := nl1.NewConn()
c2 := nl1.NewConn()
println("111")
xsend(c1, mkpkt(1, []byte("")))
println("222")
xsend(c2, mkpkt(2, []byte("")))
println("333")
wg = WorkGroup()
echoWait := func(c *Conn, msgCode uint16) func() {
return func() {
pkt := xrecv(c)
xverifyPkt(pkt, c.connId, msgCode, []byte(""))
}
// replies must be coming in reverse order
xechoWait := func(c *Conn, msgCode uint16) {
pkt := xrecv(c)
xverifyPkt(pkt, c.connId, msgCode, []byte(""))
}
wg.Gox(echoWait(c1, 1))
wg.Gox(echoWait(c2, 2))
xwait(wg)
println("aaa")
xechoWait(c2, 2)
println("bbb")
xechoWait(c1, 1)
println("ccc")
xclose(c1)
xclose(c2)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment