Commit ec4b3ce0 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/neonet: Lightweight mode

In situations when created connections are used to only send/receive 1
request/response, the overhead to create/shutdown full connections could be
too much. Unfortunately this is exactly the mode that is currently
primarily used for compatibility with NEO/py. To help mitigate the overhead
in such scenarios, lightweight connections mode is provided:

At requester side, one message can be sent over node link with link.Send1 .
Inside a connection will be created and then shut down, but since the
code manages whole process internally and does not show the connection to
user, it can optimize those operations significantly. Similarly link.Ask1
sends 1 request, receives 1 response, and then puts the connection back into
pool for later reuse.

At receiver side, link.Recv1 accepts a connection with the first message
remote peer sent us when establishing it, and wraps the result into Request
object. The Request contains the message received and internally the
connection. A response can be sent back via Request.Reply. Then once
Request.Close is called the connection object that was accepted is
immediately put back into pool for later reuse.

Some history of lightweight mode:

lab.nexedi.com/kirr/neo/commit/0fa96338	X Clarified Request.Close semantics - tests working again
lab.nexedi.com/kirr/neo/commit/a5ac1652	X Ask1: switch to sending directly over link
lab.nexedi.com/kirr/neo/commit/755e3654	X Request.Reply: switch to replying directly over link
lab.nexedi.com/kirr/neo/commit/c643ba53	X Send1: switch to sending directly over link
lab.nexedi.com/kirr/neo/commit/7dcbc9c5	X Send1: switch to lightClose
lab.nexedi.com/kirr/neo/commit/851864a9	X chan RTT benchmark which simulates Recv1 = Accept + Recv
lab.nexedi.com/kirr/neo/commit/099bfc29	X freelist for PktBuf
lab.nexedi.com/kirr/neo/commit/58c2e39a	X Benchmark for link Ask1/Recv1 over TCP loopback
parent 39982595
...@@ -36,6 +36,29 @@ ...@@ -36,6 +36,29 @@
// //
// See also package lab.nexedi.com/kirr/neo/go/neo/proto for definition of NEO // See also package lab.nexedi.com/kirr/neo/go/neo/proto for definition of NEO
// messages. // messages.
//
//
// Lightweight mode
//
// In situations when created connections are used to only send/receive 1
// request/response, the overhead to create/shutdown full connections could be
// too much. Unfortunately this is exactly the mode that is currently
// primarily used for compatibility with NEO/py. To help mitigate the overhead
// in such scenarios, lightweight connections mode is provided:
//
// At requester side, one message can be sent over node link with link.Send1 .
// Inside a connection will be created and then shut down, but since the
// code manages whole process internally and does not show the connection to
// user, it can optimize those operations significantly. Similarly link.Ask1
// sends 1 request, receives 1 response, and then puts the connection back into
// pool for later reuse.
//
// At receiver side, link.Recv1 accepts a connection with the first message
// remote peer sent us when establishing it, and wraps the result into Request
// object. The Request contains the message received and internally the
// connection. A response can be sent back via Request.Reply. Then once
// Request.Close is called the connection object that was accepted is
// immediately put back into pool for later reuse.
package neonet package neonet
// XXX neonet compatibility with NEO/py depends on the following small NEO/py patch: // XXX neonet compatibility with NEO/py depends on the following small NEO/py patch:
...@@ -152,7 +175,17 @@ type Conn struct { ...@@ -152,7 +175,17 @@ type Conn struct {
rxdownFlag atomic32 // 1 when RX is marked no longer operational rxdownFlag atomic32 // 1 when RX is marked no longer operational
// XXX ^^^ split to different cache lines? // XXX ^^^ split to different cache lines?
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed XXX !light?
// there are two modes a Conn could be used:
// - full mode - where full Conn functionality is working, and
// - light mode - where only subset functionality is working
//
// the light mode is used to implement Recv1 & friends - there any
// connection is used max to send and/or receive only 1 packet and then
// has to be reused for efficiency ideally without reallocating anything.
//
// everything below is used during !light mode only.
// rxdown chan struct{} // ready when RX is marked no longer operational // rxdown chan struct{} // ready when RX is marked no longer operational
rxdownOnce sync.Once // ----//---- XXX review rxdownOnce sync.Once // ----//---- XXX review
...@@ -247,17 +280,72 @@ func newNodeLink(conn net.Conn, role _LinkRole) *NodeLink { ...@@ -247,17 +280,72 @@ func newNodeLink(conn net.Conn, role _LinkRole) *NodeLink {
return nl return nl
} }
// newConn creates new Conn with id=connId and registers it into connTab. // connPool is freelist for Conn.
// must be called with connMu held. // XXX make it per-link?
func (link *NodeLink) newConn(connId uint32) *Conn { var connPool = sync.Pool{New: func() interface{} {
c := &Conn{ return &Conn{
link: link,
connId: connId,
rxq: make(chan *pktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf ? rxq: make(chan *pktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf ?
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
txdown: make(chan struct{}), txdown: make(chan struct{}),
// rxdown: make(chan struct{}), // rxdown: make(chan struct{}),
} }
}}
// connAlloc allocates Conn from freelist.
func (link *NodeLink) connAlloc(connId uint32) *Conn {
c := connPool.Get().(*Conn)
c.reinit()
c.link = link
c.connId = connId
return c
}
// release releases connection to freelist.
func (c *Conn) release() {
c.reinit() // XXX just in case
connPool.Put(c)
}
// reinit reinitializes connection after reallocating it from freelist.
func (c *Conn) reinit() {
c.link = nil
c.connId = 0
// .rxq - set initially; does not change
c.rxqWrite.Set(0) // XXX store relaxed?
c.rxqRead.Set(0) // ----//----
c.rxdownFlag.Set(0) // ----//----
c.rxerrOnce = sync.Once{} // XXX ok?
// XXX vvv not strictly needed for light mode?
// ensureOpen(&c.rxdown)
c.rxdownOnce = sync.Once{} // XXX ok?
c.rxclosed.Set(0)
// .txerr - never closed
ensureOpen(&c.txdown)
c.txdownOnce = sync.Once{} // XXX ok?
c.txclosed.Set(0)
c.closeOnce = sync.Once{} // XXX ok?
}
// ensureOpen make sure *ch stays non-closed chan struct{} for signalling.
// if it is already closed, the channel is remade.
func ensureOpen(ch *chan struct{}) {
select {
case <-*ch:
*ch = make(chan struct{})
default:
// not closed - nothing to do
}
}
// newConn creates new Conn with id=connId and registers it into connTab.
// must be called with connMu held.
func (link *NodeLink) newConn(connId uint32) *Conn {
c := link.connAlloc(connId)
link.connTab[connId] = c link.connTab[connId] = c
return c return c
} }
...@@ -436,7 +524,7 @@ func (c *Conn) shutdownRX(errMsg *proto.Error) { ...@@ -436,7 +524,7 @@ func (c *Conn) shutdownRX(errMsg *proto.Error) {
// downRX marks .rxq as no longer operational. // downRX marks .rxq as no longer operational.
// //
// used in shutdownRX. // used in shutdownRX and separately to mark RX down for light Conns.
func (c *Conn) downRX(errMsg *proto.Error) { func (c *Conn) downRX(errMsg *proto.Error) {
// let serveRecv know RX is down for this connection // let serveRecv know RX is down for this connection
c.rxdownFlag.Set(1) // XXX cmpxchg and return if already down? c.rxdownFlag.Set(1) // XXX cmpxchg and return if already down?
...@@ -816,6 +904,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -816,6 +904,7 @@ func (nl *NodeLink) serveRecv() {
/* /*
// XXX goes away in favour of .rxdownFlag; reasons // XXX goes away in favour of .rxdownFlag; reasons
// - no need to reallocate rxdown for light conn
// - no select // - no select
// //
// XXX review synchronization via flags for correctness (e.g. // XXX review synchronization via flags for correctness (e.g.
...@@ -1353,3 +1442,152 @@ func (c *Conn) Ask(req proto.Msg, resp proto.Msg) error { ...@@ -1353,3 +1442,152 @@ func (c *Conn) Ask(req proto.Msg, resp proto.Msg) error {
return err return err
} }
// ---- exchange of 1-1 request-reply ----
// (impedance matcher for current neo/py implementation)
// lightClose closes light connection.
//
// No Send or Recv must be in flight.
// The caller must not use c after call to lightClose - the connection is returned to freelist.
//
// Must be called only once.
func (c *Conn) lightClose() {
nl := c.link
releaseok := false
nl.connMu.Lock()
if nl.connTab != nil {
// XXX find way to keep initiated by us conn as closed for some time (see Conn.Close)
// but timer costs too much...
delete(nl.connTab, c.connId)
releaseok = true
}
nl.connMu.Unlock()
// we can release c only if we removed it from connTab.
//
// if not - the scenario could be: nl.shutdown sets nl.connTab=nil and
// then iterates connTab snapshot taken under nl.connMu lock. If so
// this activity (which calls e.g. Conn.shutdown) could be running with
// us in parallel.
if releaseok {
c.release()
}
}
// Request is a message received from the link + (internally) connection handle to make a reply.
//
// Request represents 1 request - 0|1 reply interaction model.
//
// See "Lightweight mode" in top-level package doc for overview.
type Request struct {
Msg proto.Msg
conn *Conn
}
// Recv1 accepts a connection with the first message peer sent us when establishing it.
//
// See "Lightweight mode" in top-level package doc for overview.
func (link *NodeLink) Recv1() (Request, error) {
conn, err := link.Accept()
if err != nil {
return Request{}, err // XXX or return *Request? (want to avoid alloc)
}
// NOTE serveRecv guaranty that when a conn is accepted, there is 1 message in conn.rxq
msg, err := conn.Recv() // XXX better directly from <-rxq ?
if err != nil {
conn.Close() // XXX -> conn.lightClose()
return Request{}, err
}
// noone will be reading from conn anymore - mark rx down so that if
// peer sends any another packet with same .connID, serveRecv does not
// deadlock trying to put it to conn.rxq.
conn.downRX(errConnClosed)
return Request{Msg: msg, conn: conn}, nil
}
// Reply sends response to request.
//
// See "Lightweight mode" in top-level package doc for overview.
func (req *Request) Reply(resp proto.Msg) error {
return req.conn.sendMsgDirect(resp)
//err1 := req.conn.Send(resp)
//err2 := req.conn.Close() // XXX no - only Send here?
//return xerr.First(err1, err2)
}
// Close must be called to free request resources.
//
// Close must be called exactly once.
// The request object cannot be used any more after call to Close.
//
// See "Lightweight mode" in top-level package doc for overview.
func (req *Request) Close() { // XXX +error?
// XXX req.Msg.Release() ?
req.Msg = nil
req.conn.lightClose()
req.conn = nil // just in case
}
// Send1 sends one message over new connection.
//
// It creates new connection itself internally, and shuts down it after
// transmission completes.
//
// See "Lightweight mode" in top-level package doc for overview.
func (link *NodeLink) Send1(msg proto.Msg) error {
conn, err := link.NewConn()
if err != nil {
return err
}
conn.downRX(errConnClosed) // XXX just initially create conn this way
err = conn.sendMsgDirect(msg)
conn.lightClose()
return err
}
// Ask1 sends request and receives response in 1-1 model.
//
// See Conn.Ask for semantic details.
//
// See "Lightweight mode" in top-level package doc for overview.
func (link *NodeLink) Ask1(req proto.Msg, resp proto.Msg) (err error) {
conn, err := link.NewConn()
if err != nil {
return err
}
//defer conn.lightClose()
err = conn._Ask1(req, resp)
conn.lightClose()
return err
}
func (conn *Conn) _Ask1(req proto.Msg, resp proto.Msg) error {
err := conn.sendMsgDirect(req)
if err != nil {
return err
}
nerr := &proto.Error{}
which, err := conn.Expect(resp, nerr)
switch which {
case 0:
return nil
case 1:
return nerr
}
return err
}
func (req *Request) Link() *NodeLink {
return req.conn.Link()
}
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"context" "context"
"io" "io"
"net" "net"
"reflect"
"runtime" "runtime"
"testing" "testing"
"time" "time"
...@@ -36,8 +37,10 @@ import ( ...@@ -36,8 +37,10 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/packed" "lab.nexedi.com/kirr/neo/go/internal/packed"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/zodb"
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
"github.com/pkg/errors"
) )
func xclose(c io.Closer) { func xclose(c io.Closer) {
...@@ -611,6 +614,116 @@ func TestNodeLink(t *testing.T) { ...@@ -611,6 +614,116 @@ func TestNodeLink(t *testing.T) {
} }
// ---- recv1 mode ----
func xSend(c *Conn, msg proto.Msg) {
err := c.Send(msg)
exc.Raiseif(err)
}
func xRecv(c *Conn) proto.Msg {
msg, err := c.Recv()
exc.Raiseif(err)
return msg
}
func xRecv1(l *NodeLink) Request {
req, err := l.Recv1()
exc.Raiseif(err)
return req
}
func xSend1(l *NodeLink, msg proto.Msg) {
err := l.Send1(msg)
exc.Raiseif(err)
}
func xverifyMsg(msg1, msg2 proto.Msg) {
if !reflect.DeepEqual(msg1, msg2) {
exc.Raisef("messages differ:\n%s", pretty.Compare(msg1, msg2))
}
}
func TestRecv1Mode(t *testing.T) {
// Send1
nl1, nl2 := nodeLinkPipe()
wg := &errgroup.Group{}
sync := make(chan int)
gox(wg, func() {
defer func() {
if e := recover(); e != nil {
panic(e)
}
}()
c := xaccept(nl2)
msg := xRecv(c)
xverifyMsg(msg, &proto.Ping{})
xSend(c, &proto.Pong{})
msg = xRecv(c)
xverifyMsg(msg, errConnClosed)
xclose(c)
sync <- 1
c = xaccept(nl2)
msg = xRecv(c)
xverifyMsg(msg, &proto.Error{proto.ACK, "hello up there"})
xSend(c, &proto.Error{proto.ACK, "hello to you too"})
msg = xRecv(c)
xverifyMsg(msg, errConnClosed)
xclose(c)
})
xSend1(nl1, &proto.Ping{})
// before next Send1 wait till peer receives errConnClosed from us
// otherwise peer could be already in accept while our errConnClosed is received
// and there is only one receiving thread there ^^^
<-sync
xSend1(nl1, &proto.Error{proto.ACK, "hello up there"})
xwait(wg)
// Recv1: further packets with same connid are rejected with "connection closed"
wg = &errgroup.Group{}
gox(wg, func() {
c := xnewconn(nl2)
xSend(c, &proto.Ping{})
xSend(c, &proto.Ping{})
msg := xRecv(c)
xverifyMsg(msg, errConnClosed)
})
_ = xRecv1(nl1)
xwait(wg)
// TODO link.Close vs Recv1
}
// test possible race-condition between link.shutdown and conn.lightClose:
//
// link.shutdown sets link.connTab=nil and under link.connMu and then iterates
// read connTab without link.connMu held. If conn.lightClose does
// conn.release() in parallel to link.shutdown() iterating connTab they can be
// both writing/using e.g. conn.rxdownOnce.
//
// bug triggers under -race.
func TestLightCloseVsLinkShutdown(t *testing.T) {
nl1, nl2 := nodeLinkPipe()
wg := &errgroup.Group{}
c := xnewconn(nl1)
nl1.shutdown()
gox(wg, func() {
c.lightClose()
})
xwait(wg)
xclose(nl1)
xclose(nl2)
}
// ---- benchmarks ---- // ---- benchmarks ----
// rtt over chan - for comparison as base. // rtt over chan - for comparison as base.
...@@ -874,3 +987,105 @@ func BenchmarkTCPlosrho(b *testing.B) { ...@@ -874,3 +987,105 @@ func BenchmarkTCPlosrho(b *testing.B) {
c1, c2 := xtcpPipe() c1, c2 := xtcpPipe()
benchmarkNetConnRTT(b, c1, c2, true, true) benchmarkNetConnRTT(b, c1, c2, true, true)
} }
// rtt over NodeLink via Ask1/Recv1
func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
b.ResetTimer()
go func() {
defer xclose(l2)
for {
req, err := l2.Recv1()
if err != nil {
switch errors.Cause(err) {
case ErrLinkDown:
return
default:
b.Fatal(err)
}
}
switch msg := req.Msg.(type) {
default:
b.Fatalf("read -> unexpected message %T", msg)
case *proto.GetObject:
err = req.Reply(&proto.AnswerObject{
Oid: msg.Oid,
Serial: msg.Serial,
DataSerial: msg.Tid,
})
if err != nil {
b.Fatal(err)
}
}
req.Close()
}
}()
for i := 0; i < b.N; i++ {
// NOTE keeping inside loop to simulate what happens in real Load
get := &proto.GetObject{}
obj := &proto.AnswerObject{}
get.Oid = zodb.Oid(i)
get.Serial = zodb.Tid(i+1)
get.Tid = zodb.Tid(i+2)
err := l1.Ask1(get, obj)
if err != nil {
b.Fatal(err)
}
if !(obj.Oid == get.Oid && obj.Serial == get.Serial && obj.DataSerial == get.Tid) {
b.Fatalf("read back: %v ; requested %v", obj, get)
}
// XXX must be obj.Release
obj.Data.XRelease()
}
xclose(l1)
}
// XXX RTT over Conn.Send/Recv (no msg encoding/decoding)
// XXX RTT over link.sendPkt/recvPkt (no conn route)
// xlinkPipe creates two links interconnected to each other via c1 and c2.
//
// XXX c1, c2 -> piper (who creates c1, c2) ?
// XXX overlap with nodeLinkPipe
func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) {
var l1, l2 *NodeLink
wg := &errgroup.Group{}
gox(wg, func() {
l, err := _Handshake(context.Background(), c1, _LinkClient)
exc.Raiseif(err)
l1 = l
})
gox(wg, func() {
l, err := _Handshake(context.Background(), c2, _LinkServer)
exc.Raiseif(err)
l2 = l
})
xwait(wg)
return l1, l2
}
func BenchmarkLinkNetPipeRTT(b *testing.B) {
c1, c2 := net.Pipe()
l1, l2 := xlinkPipe(c1, c2)
benchmarkLinkRTT(b, l1, l2)
}
func BenchmarkLinkTCPRTT(b *testing.B) {
c1, c2 := xtcpPipe()
l1, l2 := xlinkPipe(c1, c2)
benchmarkLinkRTT(b, l1, l2)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment