diff --git a/go/neo/server/cluster_test.go b/go/neo/server/cluster_test.go index 63bc50fe3a7c49a3bde9b1f4ee58e41cdc179cb1..a1ee7ac72a3f0b906a6afbcf08cb752c1980e780 100644 --- a/go/neo/server/cluster_test.go +++ b/go/neo/server/cluster_test.go @@ -19,10 +19,10 @@ package server // test interaction between nodes import ( - //"bytes" + "bytes" "context" //"io" - //"reflect" + "reflect" "testing" //"../../neo/client" @@ -30,10 +30,13 @@ import ( //"../../zodb" "../../zodb/storage/fs1" + "../../xcommon/xnet" "../../xcommon/xnet/pipenet" "../../xcommon/xsync" "lab.nexedi.com/kirr/go123/exc" + + "fmt" ) // XXX dup from connection_test @@ -48,9 +51,103 @@ func xfs1stor(path string) *fs1.FileStorage { return zstor } + +// traceMsg represents one tracing communication +// the goroutine which produced it will wait for send on ack before continue +type traceMsg struct { + event interface {} // xnet.Trace* | ... + ack chan struct{} +} + +// TraceChecker synchronously collects and checks tracing events +// it collects events from several sources and sends them all into one channel +// for each event the goroutine which produced it will wait for ack before continue +// XXX more text XXX naming -> verifier? +type TraceChecker struct { + t *testing.T + msgch chan *traceMsg // XXX or chan traceMsg (no ptr) ? +} + +func NewTraceChecker(t *testing.T) *TraceChecker { + return &TraceChecker{t: t, msgch: make(chan *traceMsg)} +} + +// get1 gets 1 event in place and checks it has expected type +func (tc *TraceChecker) xget1(eventp interface{}) *traceMsg { + println("xget1: entry") + msg := <-tc.msgch + println("xget1: msg", msg) + revp := reflect.ValueOf(eventp) + if revp.Type().Elem() != reflect.TypeOf(msg.event) { + tc.t.Fatalf("expected %s; got %#v", revp.Elem().Type(), msg.event) + } + // TODO *event = msg.event + return msg +} + +// trace1 sends message with one tracing event to consumer +func (tc *TraceChecker) trace1(event interface{}) { + ack := make(chan struct{}) + fmt.Printf("I: %v ...", event) + println("zzz") + //panic(0) + tc.msgch <- &traceMsg{event, ack} + <-ack + fmt.Printf(" ok\n") +} + +func (tc *TraceChecker) TraceNetDial(ev *xnet.TraceDial) { tc.trace1(ev) } +func (tc *TraceChecker) TraceNetListen(ev *xnet.TraceListen) { tc.trace1(ev) } +func (tc *TraceChecker) TraceNetTx(ev *xnet.TraceTx) { tc.trace1(ev) } + + +// Expect instruct checker to expect next event to be ... +// XXX +func (tc *TraceChecker) ExpectNetDial(dst string) { + var ev *xnet.TraceDial + msg := tc.xget1(&ev) + + if ev.Dst != dst { + tc.t.Fatalf("net dial: have %v; want: %v", ev.Dst, dst) + } + + close(msg.ack) +} + +func (tc *TraceChecker) ExpectNetListen(laddr string) { + var ev *xnet.TraceListen + msg := tc.xget1(&ev) + + if ev.Laddr != laddr { + tc.t.Fatalf("net listen: have %v; want %v", ev.Laddr, laddr) + } + + println("listen: ok") + close(msg.ack) +} + +func (tc *TraceChecker) ExpectNetTx(src, dst string, pkt string) { + var ev *xnet.TraceTx + msg := tc.xget1(&ev) + + pktb := []byte(pkt) + if !(ev.Src.String() == src && + ev.Dst.String() == dst && + bytes.Equal(ev.Pkt, pktb)) { + // TODO also print all (?) previous events + tc.t.Fatalf("expect:\nhave: %s -> %s %v\nwant: %s -> %s %v", + ev.Src, ev.Dst, ev.Pkt, src, dst, pktb) + } + + close(msg.ack) +} + + // M drives cluster with 1 S through recovery -> verification -> service -> shutdown func TestMasterStorage(t *testing.T) { - net := pipenet.New("") // test network + tc := NewTraceChecker(t) + net := xnet.NetTrace(pipenet.New(""), tc) // test network + Maddr := "0" Saddr := "1" @@ -64,7 +161,11 @@ func TestMasterStorage(t *testing.T) { _ = err // XXX }) + println("222") // expect: + //tc.ExpectNetListen("0") + tc.ExpectNetDial("0") + println("333") // M.clusterState <- RECOVERY // M.nodeTab <- Node(M) @@ -79,6 +180,9 @@ func TestMasterStorage(t *testing.T) { // expect: // M <- S .? RequestIdentification{...} + TODO test ID rejects + tc.ExpectNetTx("2c", "2s", "\x00\x00\x00\x01") // handshake + tc.ExpectNetTx("2s", "2c", "\x00\x00\x00\x01") + // M -> S .? AcceptIdentification{...} // M.nodeTab <- Node(S) XXX order can be racy? // S.nodeTab <- Node(M) XXX order can be racy? diff --git a/go/xcommon/xnet/trace.go b/go/xcommon/xnet/trace.go index 49b9248b5747172bc89e079cb9b7395b526c65d9..e723e57fb15b12b541c404faa1dbe94bf4382aed 100644 --- a/go/xcommon/xnet/trace.go +++ b/go/xcommon/xnet/trace.go @@ -34,8 +34,25 @@ import ( // only Tx events are traced: // - because Write, contrary to Read, never writes partial data on non-error // - because in case of pipenet tracing writes only is enough to get whole network exchange picture -func NetTrace(inner Network, trace func (t *TraceTx)) Network { - return &netTrace{inner, trace} +func NetTrace(inner Network, tracer Tracer) Network { + return &netTrace{inner, tracer} +} + +// Tracer is the interface that needs to be implemented by network trace receivers +type Tracer interface { + TraceNetDial(*TraceDial) + TraceNetListen(*TraceListen) + TraceNetTx(*TraceTx) +} + +// TraceDial is event corresponding to network dialing +type TraceDial struct { + Dst string +} + +// TraceListen is event corresponding to network listening +type TraceListen struct { + Laddr string } // TraceTx is event corresponding to network transmission @@ -47,8 +64,8 @@ type TraceTx struct { // netTrace wraps underlying Network such that whenever a connection is created // it is wrapped with traceConn type netTrace struct { - inner Network - trace func(t *TraceTx) + inner Network + tracer Tracer } func (nt *netTrace) Network() string { @@ -56,19 +73,23 @@ func (nt *netTrace) Network() string { } func (nt *netTrace) Dial(ctx context.Context, addr string) (net.Conn, error) { + nt.tracer.TraceNetDial(&TraceDial{Dst: addr}) c, err := nt.inner.Dial(ctx, addr) if err != nil { return nil, err } return &traceConn{nt, c}, nil + // XXX +TraceNetDialPost ? } func (nt *netTrace) Listen(laddr string) (net.Listener, error) { + nt.tracer.TraceNetListen(&TraceListen{Laddr: laddr}) l, err := nt.inner.Listen(laddr) if err != nil { return nil, err } return &netTraceListener{nt, l}, nil + // XXX +TraceNetListenPost ? } // netTraceListener wraps net.Listener to wrap accepted connections with traceConn @@ -92,7 +113,7 @@ type traceConn struct { } func (tc *traceConn) Write(b []byte) (int, error) { - t := &TraceTx{Src: tc.LocalAddr(), Dst: tc.RemoteAddr(), Pkt: b} - tc.nt.trace(t) + tc.nt.tracer.TraceNetTx(&TraceTx{Src: tc.LocalAddr(), Dst: tc.RemoteAddr(), Pkt: b}) return tc.Conn.Write(b) + // XXX +TraceNetTxPost ? }