Commit 65b17bdc authored by Kirill Smelkov's avatar Kirill Smelkov

X rework Conn acceptance to be explicit via NodeLink.Accept

This way programming is more straightforward e.g. for case where we
first have to process identification and only then serve.

This is also more similar to how usual sockets work.
parent a024d393
...@@ -33,11 +33,11 @@ import ( ...@@ -33,11 +33,11 @@ import (
// //
// New connection can be created with .NewConn() . Once connection is // New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding // created and data is sent over it, on peer's side another corresponding
// new connection will be created - accepting first packet "request" - and all // new connection will be created - accepting first packet "request" - and all XXX -> Accept
// further communication send/receive exchange will be happening in between // further communication send/receive exchange will be happening in between
// those 2 connections. // those 2 connections.
// //
// For a node to be able to accept new incoming connection it has to register // For a node to be able to accept new incoming connection it has to register XXX -> Accept
// corresponding handler with .HandleNewConn() . Without such handler // corresponding handler with .HandleNewConn() . Without such handler
// registered the node will be able to only initiate new connections, not // registered the node will be able to only initiate new connections, not
// accept new ones from its peer. // accept new ones from its peer.
...@@ -53,8 +53,11 @@ type NodeLink struct { ...@@ -53,8 +53,11 @@ type NodeLink struct {
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
handleWg sync.WaitGroup // for spawned handlers // handleWg sync.WaitGroup // for spawned handlers
handleNewConn func(conn *Conn) // handler for new connections // handleNewConn func(conn *Conn) // handler for new connections
acceptq chan *Conn // queue of incoming connections for Accept
// = nil if NodeLink is not accepting connections
txreq chan txReq // tx requests from Conns go via here txreq chan txReq // tx requests from Conns go via here
closed chan struct{} closed chan struct{}
...@@ -78,7 +81,7 @@ type Conn struct { ...@@ -78,7 +81,7 @@ type Conn struct {
closed chan struct{} closed chan struct{}
} }
// A role our end of NodeLink is intended to play // LinkRole is a role an end of NodeLink is intended to play
type LinkRole int type LinkRole int
const ( const (
LinkServer LinkRole = iota // link created as server LinkServer LinkRole = iota // link created as server
...@@ -89,10 +92,10 @@ const ( ...@@ -89,10 +92,10 @@ const (
linkFlagsMask LinkRole = (1<<32 - 1) << 16 linkFlagsMask LinkRole = (1<<32 - 1) << 16
) )
// Make a new NodeLink from already established net.Conn // NewNodeLink makes a new NodeLink from already established net.Conn
// //
// Role specifies how to treat our role on the link - either as client or // Role specifies how to treat our role on the link - either as client or
// server one. The difference in between client and server roles is only in // server one. The difference in between client and server roles is only in XXX + acceptq
// how connection ids are allocated for connections initiated at our side: // how connection ids are allocated for connections initiated at our side:
// there is no conflict in identifiers if one side always allocates them as // there is no conflict in identifiers if one side always allocates them as
// even (server) and its peer as odd (client). // even (server) and its peer as odd (client).
...@@ -101,11 +104,14 @@ const ( ...@@ -101,11 +104,14 @@ const (
// net.Listen/net.Accept and client role for connections created via net.Dial. // net.Listen/net.Accept and client role for connections created via net.Dial.
func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink { func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
var nextConnId uint32 var nextConnId uint32
var acceptq chan *Conn
switch role&^linkFlagsMask { switch role&^linkFlagsMask {
case LinkServer: case LinkServer:
nextConnId = 0 // all initiated by us connId will be even nextConnId = 0 // all initiated by us connId will be even
acceptq = make(chan *Conn) // accept queue; TODO use backlog
case LinkClient: case LinkClient:
nextConnId = 1 // ----//---- odd nextConnId = 1 // ----//---- odd
acceptq = nil // not accepting incoming connections
default: default:
panic("invalid conn role") panic("invalid conn role")
} }
...@@ -114,6 +120,7 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -114,6 +120,7 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
peerLink: conn, peerLink: conn,
connTab: map[uint32]*Conn{}, connTab: map[uint32]*Conn{},
nextConnId: nextConnId, nextConnId: nextConnId,
acceptq: acceptq,
txreq: make(chan txReq), txreq: make(chan txReq),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
...@@ -125,9 +132,11 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -125,9 +132,11 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
return nl return nl
} }
// Close node-node link. // Close closes node-node link.
// IO on connections established over it is automatically interrupted with an error. // IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
// XXX what with .acceptq ?
// mark all active Conns as closed // mark all active Conns as closed
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
...@@ -150,7 +159,7 @@ func (nl *NodeLink) Close() error { ...@@ -150,7 +159,7 @@ func (nl *NodeLink) Close() error {
return err return err
} }
// send raw packet to peer // sendPkt sends raw packet to peer
func (nl *NodeLink) sendPkt(pkt *PktBuf) error { func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if true { if true {
// XXX -> log // XXX -> log
...@@ -165,7 +174,7 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error { ...@@ -165,7 +174,7 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
return err return err
} }
// receive raw packet from peer // recvPkt receives raw packet from peer
func (nl *NodeLink) recvPkt() (*PktBuf, error) { func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// TODO organize rx buffers management (freelist etc) // TODO organize rx buffers management (freelist etc)
// TODO cleanup lots of ntoh32(...) // TODO cleanup lots of ntoh32(...)
...@@ -174,7 +183,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -174,7 +183,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// first read to read pkt header and hopefully up to page of data in 1 syscall // first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)} pkt := &PktBuf{make([]byte, 4096)}
// TODO reenable, but NOTE next packet can be also prefetched here // TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ?
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen) //n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen]) n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen])
if err != nil { if err != nil {
...@@ -193,6 +202,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -193,6 +202,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
panic("TODO message too big") panic("TODO message too big")
} }
//pkt.Data = xbytes.Resize32(pkt.Data, ntoh32(pkth.Len))
if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) { if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
// grow rxbuf // grow rxbuf
rxbuf2 := make([]byte, ntoh32(pkth.Len)) rxbuf2 := make([]byte, ntoh32(pkth.Len))
...@@ -231,7 +241,7 @@ func (nl *NodeLink) newConn(connId uint32) *Conn { ...@@ -231,7 +241,7 @@ func (nl *NodeLink) newConn(connId uint32) *Conn {
return c return c
} }
// Create a connection on top of node-node link // NewConn creates a connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn { func (nl *NodeLink) NewConn() *Conn {
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
...@@ -243,6 +253,27 @@ func (nl *NodeLink) NewConn() *Conn { ...@@ -243,6 +253,27 @@ func (nl *NodeLink) NewConn() *Conn {
return c return c
} }
// ErrClosedLink is the error indicated for opertions on closed NodeLink
var ErrLinkClosed = errors.New("node link closed")
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
// Accept waits for and accepts incoming connection on top of node-node link
func (nl *NodeLink) Accept() (*Conn, error) {
// this node link is not accepting connections
if nl.acceptq == nil {
return nil, ErrLinkNoListen
}
select {
case <-nl.closed:
return nil, ErrLinkClosed // XXX + op = Accept ?
// XXX check acceptq != nil ?
case c := <-nl.acceptq:
return c, nil
}
}
// serveRecv handles incoming packets routing them to either appropriate // serveRecv handles incoming packets routing them to either appropriate
// already-established connection or to new handling goroutine. // already-established connection or to new handling goroutine.
...@@ -263,14 +294,17 @@ func (nl *NodeLink) serveRecv() { ...@@ -263,14 +294,17 @@ func (nl *NodeLink) serveRecv() {
// pkt.ConnId -> Conn // pkt.ConnId -> Conn
connId := ntoh32(pkt.Header().ConnId) connId := ntoh32(pkt.Header().ConnId)
var handleNewConn func(conn *Conn) // var handleNewConn func(conn *Conn)
nl.connMu.Lock() nl.connMu.Lock()
conn := nl.connTab[connId] conn := nl.connTab[connId]
if conn == nil { if conn == nil {
handleNewConn = nl.handleNewConn //handleNewConn = nl.handleNewConn // XXX -> Accept
if handleNewConn != nil { //if handleNewConn != nil {
if nl.acceptq != nil {
conn = nl.newConn(connId) conn = nl.newConn(connId)
// XXX what if Accept exited because of just recently close(nl.closed)?
nl.acceptq <- conn
} }
} }
nl.connMu.Unlock() nl.connMu.Unlock()
...@@ -281,29 +315,31 @@ func (nl *NodeLink) serveRecv() { ...@@ -281,29 +315,31 @@ func (nl *NodeLink) serveRecv() {
continue continue
} }
// we are accepting new incoming connection - spawn // // XXX -> accept
// connection-serving goroutine // // we are accepting new incoming connection - spawn
if handleNewConn != nil { // // connection-serving goroutine
// TODO avoid spawning goroutine for each new Ask request - // if handleNewConn != nil {
// - by keeping pool of read inactive goroutine / conn pool ? // // TODO avoid spawning goroutine for each new Ask request -
// XXX rework interface for this to be Accept-like ? // // - by keeping pool of read inactive goroutine / conn pool ?
go func() { // // XXX rework interface for this to be Accept-like ?
nl.handleWg.Add(1) // go func() {
defer nl.handleWg.Done() // nl.handleWg.Add(1)
handleNewConn(conn) // defer nl.handleWg.Done()
}() // handleNewConn(conn)
} // }()
// }
// route packet to serving goroutine handler // route packet to serving goroutine handler
// XXX what if Conn.Recv exited because of just recently close(nl.closed) ?
conn.rxq <- pkt conn.rxq <- pkt
} }
} }
// wait for all handlers spawned for accepted connections to complete // // wait for all handlers spawned for accepted connections to complete
// XXX naming -> WaitHandlers ? // // XXX naming -> WaitHandlers ?
func (nl *NodeLink) Wait() { // func (nl *NodeLink) Wait() {
nl.handleWg.Wait() // nl.handleWg.Wait()
} // }
// request to transmit a packet. Result error goes back to errch // request to transmit a packet. Result error goes back to errch
...@@ -333,13 +369,13 @@ runloop: ...@@ -333,13 +369,13 @@ runloop:
} }
} }
// XXX move to NodeLink ctor ? // // XXX move to NodeLink ctor ?
// Set handler for new incoming connections // // Set handler for new incoming connections
func (nl *NodeLink) HandleNewConn(h func(*Conn)) { // func (nl *NodeLink) HandleNewConn(h func(*Conn)) {
nl.connMu.Lock() // nl.connMu.Lock()
defer nl.connMu.Unlock() // defer nl.connMu.Unlock()
nl.handleNewConn = h // NOTE can change handler at runtime XXX do we need this? // nl.handleNewConn = h // NOTE can change handler at runtime XXX do we need this?
} // }
// ErrClosedConn is the error indicated for read/write operations on closed Conn // ErrClosedConn is the error indicated for read/write operations on closed Conn
...@@ -426,7 +462,7 @@ func (c *Conn) Close() error { ...@@ -426,7 +462,7 @@ func (c *Conn) Close() error {
// for convinience: Dial/Listen // for convinience: Dial/Listen
// Connect to address on named network and wrap the connection as NodeLink // Dial connects to address on named network and wrap the connection as NodeLink
// TODO +tls.Config // TODO +tls.Config
func Dial(ctx context.Context, network, address string) (*NodeLink, error) { func Dial(ctx context.Context, network, address string) (*NodeLink, error) {
d := net.Dialer{} d := net.Dialer{}
......
...@@ -177,6 +177,25 @@ func TestNodeLink(t *testing.T) { ...@@ -177,6 +177,25 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
xclose(nl2) xclose(nl2)
// Close vs Accept
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = WorkGroup()
wg.Gox(func() {
tdelay()
xclose(nl2)
})
c, err := nl2.Accept()
if !(c == nil && err == ErrLinkClosed) {
t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err)
}
// nl1 is not accepting connections - because it has LinkClient role
// check Accept behaviour.
c, err = nl1.Accept()
if !(c == nil && err == ErrLinkNoListen) {
t.Fatalf("NodeLink.Accept() on non-listening node link: conn = %v, err = %v", c, err)
}
xclose(nl1)
// raw exchange // raw exchange
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
...@@ -212,7 +231,7 @@ func TestNodeLink(t *testing.T) { ...@@ -212,7 +231,7 @@ func TestNodeLink(t *testing.T) {
// Close vs Recv // Close vs Recv
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c := nl1.NewConn() c = nl1.NewConn()
wg = WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
...@@ -267,8 +286,12 @@ func TestNodeLink(t *testing.T) { ...@@ -267,8 +286,12 @@ func TestNodeLink(t *testing.T) {
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
nl2.HandleNewConn(func(c *Conn) { wg = WorkGroup()
// TODO raised err -> errch //nl2.HandleNewConn(func(c *Conn) {
wg.Gox(func() {
c, err := nl2.Accept() // XXX -> xaccept ?
exc.Raiseif(err)
pkt := xrecv(c) pkt := xrecv(c)
xverifyPkt(pkt, c.connId, 33, []byte("ping")) xverifyPkt(pkt, c.connId, 33, []byte("ping"))
...@@ -289,7 +312,8 @@ func TestNodeLink(t *testing.T) { ...@@ -289,7 +312,8 @@ func TestNodeLink(t *testing.T) {
xsend(c, mkpkt(35, []byte("ping2"))) xsend(c, mkpkt(35, []byte("ping2")))
pkt = xrecv(c) pkt = xrecv(c)
xverifyPkt(pkt, c.connId, 36, []byte("pong2")) xverifyPkt(pkt, c.connId, 36, []byte("pong2"))
nl2.Wait() //nl2.Wait()
xwait(wg)
xclose(c) xclose(c)
xclose(nl1) xclose(nl1)
...@@ -297,6 +321,7 @@ func TestNodeLink(t *testing.T) { ...@@ -297,6 +321,7 @@ func TestNodeLink(t *testing.T) {
// test 2 channels with replies comming in reversed time order // test 2 channels with replies comming in reversed time order
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = WorkGroup()
replyOrder := map[uint16]struct { // "order" in which to process requests replyOrder := map[uint16]struct { // "order" in which to process requests
start chan struct{} // processing starts when start chan is ready start chan struct{} // processing starts when start chan is ready
next uint16 // after processing this switch to next next uint16 // after processing this switch to next
...@@ -306,21 +331,28 @@ func TestNodeLink(t *testing.T) { ...@@ -306,21 +331,28 @@ func TestNodeLink(t *testing.T) {
} }
close(replyOrder[2].start) close(replyOrder[2].start)
nl2.HandleNewConn(func(c *Conn) { //nl2.HandleNewConn(func(c *Conn) {
// TODO raised err -> errch wg.Gox(func() {
pkt := xrecv(c) for _ = range replyOrder {
n := ntoh16(pkt.Header().MsgCode) c, err := nl2.Accept()
x := replyOrder[n] exc.Raiseif(err)
// wait before it is our turn & echo pkt back wg.Gox(func() {
<-x.start pkt := xrecv(c)
xsend(c, pkt) n := ntoh16(pkt.Header().MsgCode)
x := replyOrder[n]
xclose(c)
// wait before it is our turn & echo pkt back
// tell next it can start <-x.start
if x.next != 0 { xsend(c, pkt)
close(replyOrder[x.next].start)
xclose(c)
// tell next it can start
if x.next != 0 {
close(replyOrder[x.next].start)
}
})
} }
}) })
...@@ -336,7 +368,8 @@ func TestNodeLink(t *testing.T) { ...@@ -336,7 +368,8 @@ func TestNodeLink(t *testing.T) {
} }
xechoWait(c2, 2) xechoWait(c2, 2)
xechoWait(c1, 1) xechoWait(c1, 1)
nl2.Wait() //nl2.Wait()
xwait(wg)
xclose(c1) xclose(c1)
xclose(c2) xclose(c2)
......
...@@ -80,7 +80,8 @@ func ListenAndServe(ctx context.Context, net_, laddr string, srv Server) error { ...@@ -80,7 +80,8 @@ func ListenAndServe(ctx context.Context, net_, laddr string, srv Server) error {
// ---------------------------------------- // ----------------------------------------
// Identify identifies peer on the link // Identify identifies peer on the link
// it expects peer to send RequestIdentification packet and TODO // it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification parameters are ok.
// returns information about identified node or error.
func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) { func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) {
// the first conn must come with RequestIdentification packet // the first conn must come with RequestIdentification packet
conn, err := link.Accept() conn, err := link.Accept()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment