Commit 92fe5a52 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent df89fa96
...@@ -652,6 +652,7 @@ func (link *NodeLink) errAcceptShutdownAX() error { ...@@ -652,6 +652,7 @@ func (link *NodeLink) errAcceptShutdownAX() error {
return ErrLinkNoListen return ErrLinkNoListen
default: default:
// XXX do the same as in errRecvShutdown (check link.errRecv)
return ErrLinkDown return ErrLinkDown
} }
} }
...@@ -890,6 +891,12 @@ func (nl *NodeLink) serveRecv() { ...@@ -890,6 +891,12 @@ func (nl *NodeLink) serveRecv() {
// XXX goes away in favour of .rxdownFlag; reasons // XXX goes away in favour of .rxdownFlag; reasons
// - no need to reallocate rxdown for light conn // - no need to reallocate rxdown for light conn
// - no select // - no select
//
// XXX review synchronization via flags for correctness (e.g.
// if both G were on the same runqueue, spinning in G1 will
// prevent G2 progress)
//
// XXX maybe we'll need select if we add ctx into Send/Recv.
// don't even try `conn.rxq <- ...` if conn.rxdown is ready // don't even try `conn.rxq <- ...` if conn.rxdown is ready
// ( else since select is picking random ready variant Recv/serveRecv // ( else since select is picking random ready variant Recv/serveRecv
......
...@@ -80,7 +80,7 @@ func gox(wg interface { Go(func() error) }, xf func()) { ...@@ -80,7 +80,7 @@ func gox(wg interface { Go(func() error) }, xf func()) {
wg.Go(exc.Funcx(xf)) wg.Go(exc.Funcx(xf))
} }
// xlinkError verifies that err is *LinkError and returns err.Err // xlinkError verifies that err is *LinkError and returns err.Err .
func xlinkError(err error) error { func xlinkError(err error) error {
le, ok := err.(*LinkError) le, ok := err.(*LinkError)
if !ok { if !ok {
...@@ -89,7 +89,7 @@ func xlinkError(err error) error { ...@@ -89,7 +89,7 @@ func xlinkError(err error) error {
return le.Err return le.Err
} }
// xconnError verifies that err is *ConnError and returns err.Err // xconnError verifies that err is *ConnError and returns err.Err .
func xconnError(err error) error { func xconnError(err error) error {
ce, ok := err.(*ConnError) ce, ok := err.(*ConnError)
if !ok { if !ok {
...@@ -98,7 +98,7 @@ func xconnError(err error) error { ...@@ -98,7 +98,7 @@ func xconnError(err error) error {
return ce.Err return ce.Err
} }
// Prepare pktBuf with content // Prepare pktBuf with content.
func _mkpkt(connid uint32, msgcode uint16, payload []byte) *pktBuf { func _mkpkt(connid uint32, msgcode uint16, payload []byte) *pktBuf {
pkt := &pktBuf{make([]byte, proto.PktHeaderLen + len(payload))} pkt := &pktBuf{make([]byte, proto.PktHeaderLen + len(payload))}
h := pkt.Header() h := pkt.Header()
...@@ -114,7 +114,7 @@ func (c *Conn) mkpkt(msgcode uint16, payload []byte) *pktBuf { ...@@ -114,7 +114,7 @@ func (c *Conn) mkpkt(msgcode uint16, payload []byte) *pktBuf {
return _mkpkt(c.connId, msgcode, payload) return _mkpkt(c.connId, msgcode, payload)
} }
// Verify pktBuf is as expected // Verify pktBuf is as expected.
func xverifyPkt(pkt *pktBuf, connid uint32, msgcode uint16, payload []byte) { func xverifyPkt(pkt *pktBuf, connid uint32, msgcode uint16, payload []byte) {
errv := xerr.Errorv{} errv := xerr.Errorv{}
h := pkt.Header() h := pkt.Header()
...@@ -136,14 +136,15 @@ func xverifyPkt(pkt *pktBuf, connid uint32, msgcode uint16, payload []byte) { ...@@ -136,14 +136,15 @@ func xverifyPkt(pkt *pktBuf, connid uint32, msgcode uint16, payload []byte) {
exc.Raiseif( errv.Err() ) exc.Raiseif( errv.Err() )
} }
// Verify pktBuf to match expected message // Verify pktBuf to match expected message.
func xverifyPktMsg(pkt *pktBuf, connid uint32, msg proto.Msg) { func xverifyPktMsg(pkt *pktBuf, connid uint32, msg proto.Msg) {
data := make([]byte, msg.NEOMsgEncodedLen()) data := make([]byte, msg.NEOMsgEncodedLen())
msg.NEOMsgEncode(data) msg.NEOMsgEncode(data)
xverifyPkt(pkt, connid, msg.NEOMsgCode(), data) xverifyPkt(pkt, connid, msg.NEOMsgCode(), data)
} }
// delay a bit // delay a bit.
//
// needed e.g. to test Close interaction with waiting read or write // needed e.g. to test Close interaction with waiting read or write
// (we cannot easily sync and make sure e.g. read is started and became asleep) // (we cannot easily sync and make sure e.g. read is started and became asleep)
// //
...@@ -709,7 +710,7 @@ func TestRecv1Mode(t *testing.T) { ...@@ -709,7 +710,7 @@ func TestRecv1Mode(t *testing.T) {
// conn.release() in parallel to link.shutdown() iterating connTab they can be // conn.release() in parallel to link.shutdown() iterating connTab they can be
// both writing/using e.g. conn.rxdownOnce. // both writing/using e.g. conn.rxdownOnce.
// //
// bug triggers under -race // bug triggers under -race.
func TestLightCloseVsLinkShutdown(t *testing.T) { func TestLightCloseVsLinkShutdown(t *testing.T) {
nl1, nl2 := nodeLinkPipe() nl1, nl2 := nodeLinkPipe()
wg := &errgroup.Group{} wg := &errgroup.Group{}
...@@ -727,7 +728,7 @@ func TestLightCloseVsLinkShutdown(t *testing.T) { ...@@ -727,7 +728,7 @@ func TestLightCloseVsLinkShutdown(t *testing.T) {
// ---- benchmarks ---- // ---- benchmarks ----
// rtt over chan - for comparision as base // rtt over chan - for comparison as base.
func benchmarkChanRTT(b *testing.B, c12, c21 chan byte) { func benchmarkChanRTT(b *testing.B, c12, c21 chan byte) {
go func() { go func() {
for { for {
...@@ -760,7 +761,7 @@ func BenchmarkBufChanRTT(b *testing.B) { ...@@ -760,7 +761,7 @@ func BenchmarkBufChanRTT(b *testing.B) {
benchmarkChanRTT(b, make(chan byte, 1), make(chan byte, 1)) benchmarkChanRTT(b, make(chan byte, 1), make(chan byte, 1))
} }
// rtt over (acceptq, rxq) & ack chans - base comparision for link.Accept + conn.Recv // rtt over (acceptq, rxq) & ack channels - base comparison for link.Accept + conn.Recv .
func BenchmarkBufChanAXRXRTT(b *testing.B) { func BenchmarkBufChanAXRXRTT(b *testing.B) {
axq := make(chan chan byte) axq := make(chan chan byte)
ack := make(chan byte) ack := make(chan byte)
...@@ -797,8 +798,8 @@ func BenchmarkBufChanAXRXRTT(b *testing.B) { ...@@ -797,8 +798,8 @@ func BenchmarkBufChanAXRXRTT(b *testing.B) {
var gosched = make(chan struct{}) var gosched = make(chan struct{})
// GoschedLocal is like runtime.Gosched but queus current goroutine on P-local // GoschedLocal is like runtime.Gosched but queues current goroutine on P-local
// runqueue instead of global runqueu. // runqueue instead of global runqueue.
// FIXME does not work - in the end goroutines appear on different Ps/Ms // FIXME does not work - in the end goroutines appear on different Ps/Ms
func GoschedLocal() { func GoschedLocal() {
go func() { go func() {
...@@ -941,7 +942,7 @@ func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn, serveRecv bool, ghandoff ...@@ -941,7 +942,7 @@ func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn, serveRecv bool, ghandoff
xclose(c1) xclose(c1)
} }
// rtt over net.Pipe - for comparision as base // rtt over net.Pipe - for comparison as base.
func BenchmarkNetPipeRTT(b *testing.B) { func BenchmarkNetPipeRTT(b *testing.B) {
c1, c2 := net.Pipe() c1, c2 := net.Pipe()
benchmarkNetConnRTT(b, c1, c2, false, false) benchmarkNetConnRTT(b, c1, c2, false, false)
...@@ -957,7 +958,7 @@ func BenchmarkNetPipeRTTsrho(b *testing.B) { ...@@ -957,7 +958,7 @@ func BenchmarkNetPipeRTTsrho(b *testing.B) {
benchmarkNetConnRTT(b, c1, c2, true, true) benchmarkNetConnRTT(b, c1, c2, true, true)
} }
// xtcpPipe creates two TCP connections connected to each other via loopback // xtcpPipe creates two TCP connections connected to each other via loopback.
func xtcpPipe() (*net.TCPConn, *net.TCPConn) { func xtcpPipe() (*net.TCPConn, *net.TCPConn) {
// NOTE go sets TCP_NODELAY by default for TCP sockets // NOTE go sets TCP_NODELAY by default for TCP sockets
l, err := net.Listen("tcp", "localhost:") l, err := net.Listen("tcp", "localhost:")
...@@ -973,7 +974,7 @@ func xtcpPipe() (*net.TCPConn, *net.TCPConn) { ...@@ -973,7 +974,7 @@ func xtcpPipe() (*net.TCPConn, *net.TCPConn) {
return c1.(*net.TCPConn), c2.(*net.TCPConn) return c1.(*net.TCPConn), c2.(*net.TCPConn)
} }
// rtt over TCP/loopback - for comparision as base // rtt over TCP/loopback - for comparison as base.
func BenchmarkTCPlo(b *testing.B) { func BenchmarkTCPlo(b *testing.B) {
c1, c2 := xtcpPipe() c1, c2 := xtcpPipe()
benchmarkNetConnRTT(b, c1, c2, false, false) benchmarkNetConnRTT(b, c1, c2, false, false)
...@@ -1056,7 +1057,8 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) { ...@@ -1056,7 +1057,8 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
// XXX RTT over Conn.Send/Recv (no msg encoding/decoding) // XXX RTT over Conn.Send/Recv (no msg encoding/decoding)
// XXX RTT over link.sendPkt/recvPkt (no conn route) // XXX RTT over link.sendPkt/recvPkt (no conn route)
// xlinkPipe creates two links interconnected to each other via c1 and c2 // xlinkPipe creates two links interconnected to each other via c1 and c2.
//
// XXX c1, c2 -> piper (who creates c1, c2) ? // XXX c1, c2 -> piper (who creates c1, c2) ?
// XXX overlap with nodeLinkPipe // XXX overlap with nodeLinkPipe
func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) { func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment