Commit d4be13da authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b978935d
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// NEO | Connection management // NEO. Connection management
package neo package neo
...@@ -20,8 +20,6 @@ import ( ...@@ -20,8 +20,6 @@ import (
"net" "net"
"sync" "sync"
"unsafe" "unsafe"
//"fmt"
) )
// NodeLink is a node-node link in NEO // NodeLink is a node-node link in NEO
...@@ -48,8 +46,8 @@ import ( ...@@ -48,8 +46,8 @@ import (
type NodeLink struct { type NodeLink struct {
peerLink net.Conn // raw conn to peer peerLink net.Conn // raw conn to peer
connMu sync.Mutex // TODO -> RW ? connMu sync.Mutex // TODO -> RW ?
connTab map[uint32]*Conn // connId -> Conn associated with connId connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
...@@ -72,43 +70,28 @@ type Conn struct { ...@@ -72,43 +70,28 @@ type Conn struct {
rxq chan *PktBuf // received packets for this Conn go here rxq chan *PktBuf // received packets for this Conn go here
txerr chan error // transmit errors for this Conn go back here txerr chan error // transmit errors for this Conn go back here
// Conn has to be explicitly closed by user; it can also be closed by NodeLink.Close // once because: Conn has to be explicitly closed by user; it can also
// be closed by NodeLink.Close .
closeOnce sync.Once closeOnce sync.Once
closed chan struct{} closed chan struct{}
} }
// Buffer with packet data // A role our end of NodeLink is intended to play
// XXX move me out of here type LinkRole int
type PktBuf struct {
Data []byte // whole packet data including all headers XXX -> Buf ?
}
// Get pointer to packet header
func (pkt *PktBuf) Header() *PktHead {
// XXX check len(Data) < PktHead ? -> no, Data has to be allocated with cap >= PktHeadLen
return (*PktHead)(unsafe.Pointer(&pkt.Data[0]))
}
// Get packet payload
func (pkt *PktBuf) Payload() []byte {
return pkt.Data[PktHeadLen:]
}
type ConnRole int
const ( const (
ConnServer ConnRole = iota // connection created as server LinkServer ConnRole = iota // link created as server
ConnClient // connection created as client LinkClient // link created as client
// for testing: // for testing:
connNoRecvSend ConnRole = 1<<16 // do not spawn serveRecv & serveSend linkNoRecvSend ConnRole = 1<<16 // do not spawn serveRecv & serveSend
connFlagsMask ConnRole = (1<<32 - 1) << 16 linkFlagsMask ConnRole = (1<<32 - 1) << 16
) )
// Make a new NodeLink from already established net.Conn // Make a new NodeLink from already established net.Conn
// //
// role specifies how to treat conn - either as server or client one. // role specifies how to treat conn - either as server or client one.
// The difference in between client and server roles are in connid % 2 XXX text // The difference in between client and server roles is only in how connection
// ids are allocated for connections initiated at our side: there is no overlap in identifiers if one side always allocates them as even and its peer as odd. in connId % 2 XXX text
// //
// Usually server role should be used for connections created via // Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial. // net.Listen/net.Accept and client role for connections created via net.Dial.
......
...@@ -15,12 +15,9 @@ ...@@ -15,12 +15,9 @@
package neo package neo
import ( import (
//"fmt"
"bytes" "bytes"
"context" "context"
"io" "io"
//"fmt"
"net" "net"
"testing" "testing"
"time" "time"
...@@ -28,7 +25,6 @@ import ( ...@@ -28,7 +25,6 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
//"lab.nexedi.com/kirr/go123/myname"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
) )
...@@ -118,7 +114,7 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) { ...@@ -118,7 +114,7 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) {
errv.Appendf("header: unexpected length %v (want %v)", ntoh32(h.Len), PktHeadLen + len(payload)) errv.Appendf("header: unexpected length %v (want %v)", ntoh32(h.Len), PktHeadLen + len(payload))
} }
if !bytes.Equal(pkt.Payload(), payload) { if !bytes.Equal(pkt.Payload(), payload) {
errv.Appendf("payload differ") // XXX also print payload ? errv.Appendf("payload differ")
} }
exc.Raiseif( errv.Err() ) exc.Raiseif( errv.Err() )
...@@ -131,13 +127,14 @@ func tdelay() { ...@@ -131,13 +127,14 @@ func tdelay() {
time.Sleep(1*time.Millisecond) time.Sleep(1*time.Millisecond)
} }
// create NodeLinks connected via net.Pipe
func _nodeLinkPipe(flags1, flags2 ConnRole) (nl1, nl2 *NodeLink) { func _nodeLinkPipe(flags1, flags2 ConnRole) (nl1, nl2 *NodeLink) {
node1, node2 := net.Pipe() node1, node2 := net.Pipe()
nl1 = NewNodeLink(node1, ConnClient | flags1) nl1 = NewNodeLink(node1, ConnClient | flags1)
nl2 = NewNodeLink(node2, ConnServer | flags2) nl2 = NewNodeLink(node2, ConnServer | flags2)
return nl1, nl2 return nl1, nl2
} }
// create NodeLinks connected via net.Pipe
func nodeLinkPipe() (nl1, nl2 *NodeLink) { func nodeLinkPipe() (nl1, nl2 *NodeLink) {
return _nodeLinkPipe(0, 0) return _nodeLinkPipe(0, 0)
} }
...@@ -174,7 +171,7 @@ func TestNodeLink(t *testing.T) { ...@@ -174,7 +171,7 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
xclose(nl2) xclose(nl2)
// check raw exchange works // raw exchange
nl1, nl2 = _nodeLinkPipe(connNoRecvSend, connNoRecvSend) nl1, nl2 = _nodeLinkPipe(connNoRecvSend, connNoRecvSend)
wg, ctx := WorkGroupCtx(context.Background()) wg, ctx := WorkGroupCtx(context.Background())
...@@ -286,10 +283,10 @@ func TestNodeLink(t *testing.T) { ...@@ -286,10 +283,10 @@ func TestNodeLink(t *testing.T) {
xsend(c, mkpkt(35, []byte("ping2"))) xsend(c, mkpkt(35, []byte("ping2")))
pkt = xrecv(c) pkt = xrecv(c)
xverifyPkt(pkt, c.connId, 36, []byte("pong2")) xverifyPkt(pkt, c.connId, 36, []byte("pong2"))
nl2.Wait()
xclose(c) xclose(c)
xclose(nl1) xclose(nl1)
nl2.Wait()
xclose(nl2) xclose(nl2)
// test 2 channels with replies comming in reversed time order // test 2 channels with replies comming in reversed time order
...@@ -304,6 +301,7 @@ func TestNodeLink(t *testing.T) { ...@@ -304,6 +301,7 @@ func TestNodeLink(t *testing.T) {
close(replyOrder[2].start) close(replyOrder[2].start)
nl2.HandleNewConn(func(c *Conn) { nl2.HandleNewConn(func(c *Conn) {
// TODO raised err -> errch
pkt := xrecv(c) pkt := xrecv(c)
n := ntoh16(pkt.Header().MsgCode) n := ntoh16(pkt.Header().MsgCode)
x := replyOrder[n] x := replyOrder[n]
...@@ -332,10 +330,10 @@ func TestNodeLink(t *testing.T) { ...@@ -332,10 +330,10 @@ func TestNodeLink(t *testing.T) {
} }
xechoWait(c2, 2) xechoWait(c2, 2)
xechoWait(c1, 1) xechoWait(c1, 1)
nl2.Wait()
xclose(c1) xclose(c1)
xclose(c2) xclose(c2)
xclose(nl1) xclose(nl1)
nl2.Wait()
xclose(nl2) xclose(nl2)
} }
This diff is collapsed.
This diff is collapsed.
// TODO text what it does (generates code for pkt.go) // TODO text what it does (generates code for proto.go)
// +build ignore // +build ignore
...@@ -17,7 +17,7 @@ func main() { ...@@ -17,7 +17,7 @@ func main() {
fset := token.NewFileSet() fset := token.NewFileSet()
var mode parser.Mode = 0 // parser.Trace var mode parser.Mode = 0 // parser.Trace
f, err := parser.ParseFile(fset, "pkt.go", nil, mode) f, err := parser.ParseFile(fset, "proto.go", nil, mode)
if err != nil { if err != nil {
panic(err) // XXX log panic(err) // XXX log
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment