Commit eb41c45f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent edfc431e
...@@ -21,8 +21,10 @@ import ( ...@@ -21,8 +21,10 @@ import (
"sync" "sync"
"unsafe" "unsafe"
//"fmt" "fmt"
) )
// XXX temp
var _ = fmt.Println
// NodeLink is a node-node link in NEO // NodeLink is a node-node link in NEO
// //
...@@ -182,10 +184,13 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -182,10 +184,13 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// first read to read pkt header and hopefully up to page of data in 1 syscall // first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)} pkt := &PktBuf{make([]byte, 4096)}
n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen) // TODO reenable, but NOTE next packet can be also prefetched here
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen])
if err != nil { if err != nil {
return nil, err // XXX err adjust ? return nil, err // XXX err adjust ?
} }
//println("read head:", n)
pkth := pkt.Header() pkth := pkt.Header()
...@@ -212,6 +217,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -212,6 +217,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
if err != nil { if err != nil {
return nil, err // XXX err adjust ? return nil, err // XXX err adjust ?
} }
//println("read data:", len(pkt.Data)-n)
} }
return pkt, nil return pkt, nil
...@@ -264,6 +270,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -264,6 +270,7 @@ func (nl *NodeLink) serveRecv() {
// pkt.ConnId -> Conn // pkt.ConnId -> Conn
connId := ntoh32(pkt.Header().ConnId) connId := ntoh32(pkt.Header().ConnId)
//fmt.Printf("%p\t (recv) -> connId: %v\n", nl, connId)
var handleNewConn func(conn *Conn) var handleNewConn func(conn *Conn)
nl.connMu.Lock() nl.connMu.Lock()
...@@ -273,6 +280,8 @@ func (nl *NodeLink) serveRecv() { ...@@ -273,6 +280,8 @@ func (nl *NodeLink) serveRecv() {
if handleNewConn != nil { if handleNewConn != nil {
conn = nl.newConn(connId) conn = nl.newConn(connId)
} }
// FIXME update connTab with born conn
} }
nl.connMu.Unlock() nl.connMu.Unlock()
...@@ -287,6 +296,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -287,6 +296,7 @@ func (nl *NodeLink) serveRecv() {
if handleNewConn != nil { if handleNewConn != nil {
// TODO avoid spawning goroutine for each new Ask request - // TODO avoid spawning goroutine for each new Ask request -
// - by keeping pool of read inactive goroutine / conn pool ? // - by keeping pool of read inactive goroutine / conn pool ?
//println("+ handle", connId)
go handleNewConn(conn) go handleNewConn(conn)
} }
......
...@@ -93,7 +93,7 @@ func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf { ...@@ -93,7 +93,7 @@ func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
h := pkt.Header() h := pkt.Header()
h.ConnId = hton32(connid) h.ConnId = hton32(connid)
h.MsgCode = hton16(msgcode) h.MsgCode = hton16(msgcode)
h.Len = hton32(PktHeadLen + 4) h.Len = hton32(PktHeadLen + uint32(len(payload)))
copy(pkt.Payload(), payload) copy(pkt.Payload(), payload)
return pkt return pkt
} }
...@@ -263,7 +263,6 @@ func TestNodeLink(t *testing.T) { ...@@ -263,7 +263,6 @@ func TestNodeLink(t *testing.T) {
xclose(nl2) // for completeness xclose(nl2) // for completeness
// Conn accept + exchange // Conn accept + exchange
println("000")
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
nl2.HandleNewConn(func(c *Conn) { nl2.HandleNewConn(func(c *Conn) {
// TODO raised err -> errch // TODO raised err -> errch
...@@ -281,7 +280,7 @@ func TestNodeLink(t *testing.T) { ...@@ -281,7 +280,7 @@ func TestNodeLink(t *testing.T) {
xverifyPkt(pkt2, c1.connId, 34, []byte("pong")) xverifyPkt(pkt2, c1.connId, 34, []byte("pong"))
// test 2 channels with replies comming in reversed time order // test 2 channels with replies comming in reversed time order
println("111") nl1, nl2 = nodeLinkPipe()
order := map[uint16]struct { // "order" in which to process requests order := map[uint16]struct { // "order" in which to process requests
start chan int // processing starts when start chan is ready start chan int // processing starts when start chan is ready
next uint16 // after processing this switch to next next uint16 // after processing this switch to next
...@@ -292,7 +291,7 @@ func TestNodeLink(t *testing.T) { ...@@ -292,7 +291,7 @@ func TestNodeLink(t *testing.T) {
go func() { go func() {
order[2].start <- 0 order[2].start <- 0
}() }()
println("aaa") c1 = nl1.NewConn() // XXX temp?
c2 := nl1.NewConn() c2 := nl1.NewConn()
nl2.HandleNewConn(func(c *Conn) { nl2.HandleNewConn(func(c *Conn) {
pkt := xrecv(c) pkt := xrecv(c)
...@@ -311,26 +310,17 @@ func TestNodeLink(t *testing.T) { ...@@ -311,26 +310,17 @@ func TestNodeLink(t *testing.T) {
xsend(c1, mkpkt(1, []byte(""))) xsend(c1, mkpkt(1, []byte("")))
xsend(c2, mkpkt(2, []byte(""))) xsend(c2, mkpkt(2, []byte("")))
println("bbb")
wg = WorkGroup() wg = WorkGroup()
echoTest := func(c *Conn, msgCode uint16) func() { echoWait := func(c *Conn, msgCode uint16) func() {
return func() { return func() {
/* pkt := xrecv(c)
pkt := mkpkt(msgCode, []byte("")) xverifyPkt(pkt, c.connId, msgCode, []byte(""))
xsend(c, pkt)
*/
pkt2 := xrecv(c)
println("recv", msgCode)
xverifyPkt(pkt2, c.connId, msgCode, []byte(""))
} }
} }
wg.Gox(echoTest(c1, 1)) wg.Gox(echoWait(c1, 1))
wg.Gox(echoTest(c2, 2)) wg.Gox(echoWait(c2, 2))
println("ccc")
xwait(wg) xwait(wg)
println("ddd")
xclose(c1) xclose(c1)
xclose(c2) xclose(c2)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment