Commit df89fa96 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent deaca577
...@@ -168,13 +168,9 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -168,13 +168,9 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
return _nodeLinkPipe(0, 0) return _nodeLinkPipe(0, 0)
} }
// XXX temp for cluster_test.go
// var NodeLinkPipe = nodeLinkPipe
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
//println("000")
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := &errgroup.Group{} wg := &errgroup.Group{}
...@@ -232,7 +228,6 @@ func TestNodeLink(t *testing.T) { ...@@ -232,7 +228,6 @@ func TestNodeLink(t *testing.T) {
} }
xclose(nl1) xclose(nl1)
//println("111")
// Close vs recvPkt on another side // Close vs recvPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &errgroup.Group{} wg = &errgroup.Group{}
...@@ -262,8 +257,6 @@ func TestNodeLink(t *testing.T) { ...@@ -262,8 +257,6 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
xclose(nl1) xclose(nl1)
//println("222")
// raw exchange // raw exchange
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
...@@ -294,8 +287,6 @@ func TestNodeLink(t *testing.T) { ...@@ -294,8 +287,6 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
xwait(wgclose) xwait(wgclose)
//println("333")
// ---- connections on top of nodelink ---- // ---- connections on top of nodelink ----
// Close vs recvPkt // Close vs recvPkt
...@@ -314,8 +305,6 @@ func TestNodeLink(t *testing.T) { ...@@ -314,8 +305,6 @@ func TestNodeLink(t *testing.T) {
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
//println("444")
// Close vs sendPkt // Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1) c = xnewconn(nl1)
...@@ -331,8 +320,6 @@ func TestNodeLink(t *testing.T) { ...@@ -331,8 +320,6 @@ func TestNodeLink(t *testing.T) {
} }
xwait(wg) xwait(wg)
//println("555")
// NodeLink.Close vs Conn.sendPkt/recvPkt // NodeLink.Close vs Conn.sendPkt/recvPkt
c11 := xnewconn(nl1) c11 := xnewconn(nl1)
c12 := xnewconn(nl1) c12 := xnewconn(nl1)
...@@ -357,8 +344,6 @@ func TestNodeLink(t *testing.T) { ...@@ -357,8 +344,6 @@ func TestNodeLink(t *testing.T) {
xclose(c12) xclose(c12)
xclose(nl2) xclose(nl2)
//println(600)
// NodeLink.Close vs Conn.sendPkt/recvPkt and Accept on another side // NodeLink.Close vs Conn.sendPkt/recvPkt and Accept on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, 0) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, 0)
c21 := xnewconn(nl2) c21 := xnewconn(nl2)
...@@ -396,7 +381,6 @@ func TestNodeLink(t *testing.T) { ...@@ -396,7 +381,6 @@ func TestNodeLink(t *testing.T) {
xclose(nl1) xclose(nl1)
xwait(wg) xwait(wg)
//println(777)
// XXX denoise vvv // XXX denoise vvv
// NewConn after NodeLink shutdown // NewConn after NodeLink shutdown
...@@ -462,8 +446,6 @@ func TestNodeLink(t *testing.T) { ...@@ -462,8 +446,6 @@ func TestNodeLink(t *testing.T) {
t.Fatalf("Accept after NodeLink close: %v", err) t.Fatalf("Accept after NodeLink close: %v", err)
} }
//println(888)
xclose(c21) xclose(c21)
xclose(c22) xclose(c22)
// recvPkt/sendPkt error after Close & NodeLink shutdown // recvPkt/sendPkt error after Close & NodeLink shutdown
...@@ -480,8 +462,6 @@ func TestNodeLink(t *testing.T) { ...@@ -480,8 +462,6 @@ func TestNodeLink(t *testing.T) {
saveKeepClosed := connKeepClosed saveKeepClosed := connKeepClosed
connKeepClosed = 10*time.Millisecond connKeepClosed = 10*time.Millisecond
//println(999)
//println()
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
...@@ -505,42 +485,28 @@ func TestNodeLink(t *testing.T) { ...@@ -505,42 +485,28 @@ func TestNodeLink(t *testing.T) {
xclose(c) xclose(c)
closed <- 1 closed <- 1
//println("X ααα")
// once again as ^^^ but finish only with CloseRecv // once again as ^^^ but finish only with CloseRecv
c2 := xaccept(nl2) c2 := xaccept(nl2)
//println("X ααα + 1")
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
//println("X ααα + 2")
xverifyPkt(pkt, c2.connId, 41, []byte("ping5")) xverifyPkt(pkt, c2.connId, 41, []byte("ping5"))
xsendPkt(c2, c2.mkpkt(42, []byte("pong5"))) xsendPkt(c2, c2.mkpkt(42, []byte("pong5")))
//println("X βββ")
c2.CloseRecv() c2.CloseRecv()
closed <- 2 closed <- 2
//println("X γγγ")
// "connection refused" when trying to connect to not-listening peer // "connection refused" when trying to connect to not-listening peer
c = xnewconn(nl2) // XXX should get error here? c = xnewconn(nl2) // XXX should get error here?
xsendPkt(c, c.mkpkt(38, []byte("pong3"))) xsendPkt(c, c.mkpkt(38, []byte("pong3")))
//println("X γγγ + 1")
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
//println("X γγγ + 2")
xverifyPktMsg(pkt, c.connId, errConnRefused) xverifyPktMsg(pkt, c.connId, errConnRefused)
xsendPkt(c, c.mkpkt(40, []byte("pong4"))) // once again xsendPkt(c, c.mkpkt(40, []byte("pong4"))) // once again
//println("X γγγ + 3")
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
//println("X γγγ + 4")
xverifyPktMsg(pkt, c.connId, errConnRefused) xverifyPktMsg(pkt, c.connId, errConnRefused)
//println("X zzz")
xclose(c) xclose(c)
}) })
//println("aaa")
c1 := xnewconn(nl1) c1 := xnewconn(nl1)
xsendPkt(c1, c1.mkpkt(33, []byte("ping"))) xsendPkt(c1, c1.mkpkt(33, []byte("ping")))
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
...@@ -549,22 +515,16 @@ func TestNodeLink(t *testing.T) { ...@@ -549,22 +515,16 @@ func TestNodeLink(t *testing.T) {
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
xverifyPkt(pkt, c1.connId, 36, []byte("pong2")) xverifyPkt(pkt, c1.connId, 36, []byte("pong2"))
//println("111")
// "connection closed" after peer closed its end // "connection closed" after peer closed its end
<-closed <-closed
//println("111 + closed")
xsendPkt(c1, c1.mkpkt(37, []byte("ping3"))) xsendPkt(c1, c1.mkpkt(37, []byte("ping3")))
//println("111 + 1")
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
//println("111 + 2")
xverifyPktMsg(pkt, c1.connId, errConnClosed) xverifyPktMsg(pkt, c1.connId, errConnClosed)
xsendPkt(c1, c1.mkpkt(39, []byte("ping4"))) // once again xsendPkt(c1, c1.mkpkt(39, []byte("ping4"))) // once again
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
//println("111 + 4")
xverifyPktMsg(pkt, c1.connId, errConnClosed) xverifyPktMsg(pkt, c1.connId, errConnClosed)
// XXX also should get EOF on recv // XXX also should get EOF on recv
//println("222")
// one more time but now peer does only .CloseRecv() // one more time but now peer does only .CloseRecv()
c2 := xnewconn(nl1) c2 := xnewconn(nl1)
xsendPkt(c2, c2.mkpkt(41, []byte("ping5"))) xsendPkt(c2, c2.mkpkt(41, []byte("ping5")))
...@@ -575,9 +535,7 @@ func TestNodeLink(t *testing.T) { ...@@ -575,9 +535,7 @@ func TestNodeLink(t *testing.T) {
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
xverifyPktMsg(pkt, c2.connId, errConnClosed) xverifyPktMsg(pkt, c2.connId, errConnClosed)
//println("333 z")
xwait(wg) xwait(wg)
//println("444")
// make sure entry for closed nl2.1 stays in nl2.connTab // make sure entry for closed nl2.1 stays in nl2.connTab
nl2.connMu.Lock() nl2.connMu.Lock()
...@@ -594,16 +552,12 @@ func TestNodeLink(t *testing.T) { ...@@ -594,16 +552,12 @@ func TestNodeLink(t *testing.T) {
} }
nl2.connMu.Unlock() nl2.connMu.Unlock()
//println("bbb")
xclose(c1) xclose(c1)
xclose(c2) xclose(c2)
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
connKeepClosed = saveKeepClosed connKeepClosed = saveKeepClosed
//println("\nsss")
// test 2 channels with replies coming in reversed time order // test 2 channels with replies coming in reversed time order
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = &errgroup.Group{} wg = &errgroup.Group{}
...@@ -691,7 +645,6 @@ func xverifyMsg(msg1, msg2 proto.Msg) { ...@@ -691,7 +645,6 @@ func xverifyMsg(msg1, msg2 proto.Msg) {
} }
func TestRecv1Mode(t *testing.T) { func TestRecv1Mode(t *testing.T) {
//println("000")
// Send1 // Send1
nl1, nl2 := nodeLinkPipe() nl1, nl2 := nodeLinkPipe()
wg := &errgroup.Group{} wg := &errgroup.Group{}
...@@ -703,21 +656,15 @@ func TestRecv1Mode(t *testing.T) { ...@@ -703,21 +656,15 @@ func TestRecv1Mode(t *testing.T) {
} }
}() }()
//println("X aaa")
c := xaccept(nl2) c := xaccept(nl2)
//println("X aaa + 1")
msg := xRecv(c) msg := xRecv(c)
//println("X aaa + 2")
xverifyMsg(msg, &proto.Ping{}) xverifyMsg(msg, &proto.Ping{})
xSend(c, &proto.Pong{}) xSend(c, &proto.Pong{})
//println("X aaa + 3")
msg = xRecv(c) msg = xRecv(c)
//println("X aaa + 4")
xverifyMsg(msg, errConnClosed) xverifyMsg(msg, errConnClosed)
xclose(c) xclose(c)
sync <- 1 sync <- 1
//println("X zzz")
c = xaccept(nl2) c = xaccept(nl2)
msg = xRecv(c) msg = xRecv(c)
...@@ -725,38 +672,27 @@ func TestRecv1Mode(t *testing.T) { ...@@ -725,38 +672,27 @@ func TestRecv1Mode(t *testing.T) {
xverifyMsg(msg, &proto.Error{proto.ACK, "hello up there"}) xverifyMsg(msg, &proto.Error{proto.ACK, "hello up there"})
xSend(c, &proto.Error{proto.ACK, "hello to you too"}) xSend(c, &proto.Error{proto.ACK, "hello to you too"})
msg = xRecv(c) msg = xRecv(c)
//println("X zzz + 2")
xverifyMsg(msg, errConnClosed) xverifyMsg(msg, errConnClosed)
//println("X zzz + 3")
xclose(c) xclose(c)
}) })
//println("aaa")
xSend1(nl1, &proto.Ping{}) xSend1(nl1, &proto.Ping{})
// before next Send1 wait till peer receives errConnClosed from us // before next Send1 wait till peer receives errConnClosed from us
// otherwise peer could be already in accept while our errConnClosed is received // otherwise peer could be already in accept while our errConnClosed is received
// and there is only one receiving thread there ^^^ // and there is only one receiving thread there ^^^
<-sync <-sync
//println("bbb")
xSend1(nl1, &proto.Error{proto.ACK, "hello up there"}) xSend1(nl1, &proto.Error{proto.ACK, "hello up there"})
//println("ccc")
xwait(wg) xwait(wg)
//println("111\n")
// Recv1: further packets with same connid are rejected with "connection closed" // Recv1: further packets with same connid are rejected with "connection closed"
wg = &errgroup.Group{} wg = &errgroup.Group{}
gox(wg, func() { gox(wg, func() {
c := xnewconn(nl2) c := xnewconn(nl2)
//println("aaa")
xSend(c, &proto.Ping{}) xSend(c, &proto.Ping{})
//println("bbb")
xSend(c, &proto.Ping{}) xSend(c, &proto.Ping{})
//println("ccc")
msg := xRecv(c) msg := xRecv(c)
//println("ddd")
xverifyMsg(msg, errConnClosed) xverifyMsg(msg, errConnClosed)
}) })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment