Commit 9d3a80ae authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 06e18013
......@@ -58,7 +58,12 @@ import (
// ---- events used in tests ----
// xnet.TraceConnect
// xnet.TraceListen
// event: node starts listening
type eventNetListen struct {
Laddr string
}
// event: tx via neo.Conn
type eventNeoSend struct {
......@@ -100,24 +105,75 @@ type EventRouter struct {
mu sync.Mutex
defaultq *tsync.SyncChan
// events specific to particular node - e.g. node starts listening
byNode map[string /*host*/]*tsync.SyncChan
//byLink
}
func NewEventRouter() *EventRouter {
return &EventRouter{defaultq: tsync.NewSyncChan()}
return &EventRouter{
defaultq: tsync.NewSyncChan("default"),
byNode: make(map[string]*tsync.SyncChan),
}
}
func (r *EventRouter) AllRoutes() []*tsync.SyncChan {
rtset := map[*tsync.SyncChan]int{}
rtset[r.defaultq] = 1
for _, dst := range r.byNode {
rtset[dst] = 1
}
// XXX byLink
var rtv []*tsync.SyncChan
for dst := range rtset {
rtv = append(rtv, dst)
}
return rtv
}
// hostport splits addr of for "host:port" into host and port.
//
// if the address has not the specified form returned are:
// - host = addr
// - port = ""
func hostport(addr string) (host string, port string) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return addr, ""
}
return host, port
}
func (r *EventRouter) Route(event interface{}) *tsync.SyncChan {
r.mu.Lock()
defer r.mu.Unlock()
switch event.(type) {
// ...
switch ev := event.(type) {
case *eventNetListen:
host, _ := hostport(ev.Laddr)
dst := r.byNode[host]
if dst != nil {
return dst
}
}
return r.defaultq // use default XXX or better nil?
}
func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) {
r.mu.Lock()
defer r.mu.Unlock()
if _, already := r.byNode[host]; already {
panic(fmt.Sprintf("event router: node %q already branched", host))
}
r.byNode[host] = dst
}
// ---- trace probes, etc -> events -> dispatcher ----
// TraceCollector connects to NEO-specific trace points via probes and sends events to dispatcher.
......@@ -148,7 +204,11 @@ func (t *TraceCollector) Detach() {
}
func (t *TraceCollector) TraceNetConnect(ev *xnet.TraceConnect) { t.d.Dispatch(ev) }
func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) { t.d.Dispatch(ev) }
func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) {
t.d.Dispatch(&eventNetListen{Laddr: ev.Laddr.String()})
}
func (t *TraceCollector) TraceNetTx(ev *xnet.TraceTx) {} // we use traceNeoMsgSend instead
func (t *TraceCollector) traceNeoMsgSendPre(l *neo.NodeLink, connID uint32, msg neo.Msg) {
......@@ -181,7 +241,7 @@ func TestMasterStorage(t *testing.T) {
defer tracer.Detach()
// by default events go to g
g := tsync.NewEventChecker(t, rt.defaultq)
g := tsync.NewEventChecker(t, dispatch, rt.defaultq)
......@@ -212,8 +272,12 @@ func TestMasterStorage(t *testing.T) {
return &xnet.TraceConnect{Src: xaddr(src), Dst: xaddr(dst), Dialed: dialed}
}
netlisten := func(laddr string) *xnet.TraceListen {
return &xnet.TraceListen{Laddr: xaddr(laddr)}
//netlisten := func(laddr string) *xnet.TraceListen {
// return &xnet.TraceListen{Laddr: xaddr(laddr)}
//}
netlisten := func(laddr string) *eventNetListen {
return &eventNetListen{Laddr: laddr}
}
// shortcut for net tx event over nodelink connection
......@@ -245,6 +309,13 @@ func TestMasterStorage(t *testing.T) {
Shost := xnet.NetTrace(net.Host("s"), tracer)
// Chost := xnet.NetTrace(net.Host("c"), tracer)
cm := tsync.NewSyncChan("m.local") // trace of events local to M
cs := tsync.NewSyncChan("s.local") // trace of events local to S XXX with cause root also on S
tm := tsync.NewEventChecker(t, dispatch, cm)
ts := tsync.NewEventChecker(t, dispatch, cs)
rt.BranchNode("m", cm)
rt.BranchNode("s", cs)
gwg := &errgroup.Group{}
// start master
......@@ -258,13 +329,6 @@ func TestMasterStorage(t *testing.T) {
exc.Raiseif(err)
})
// M starts listening
g.Expect(netlisten("m:1"))
g.Expect(node(M.node, "m:1", neo.MASTER, 1, neo.RUNNING, neo.IdTimeNone))
g.Expect(clusterState(&M.node.ClusterState, neo.ClusterRecovering))
// TODO create C; C tries connect to master - rejected ("not yet operational")
// start storage
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zstor)
......@@ -275,8 +339,16 @@ func TestMasterStorage(t *testing.T) {
exc.Raiseif(err)
})
// M starts listening
tm.Expect(netlisten("m:1"))
tm.Expect(node(M.node, "m:1", neo.MASTER, 1, neo.RUNNING, neo.IdTimeNone))
tm.Expect(clusterState(&M.node.ClusterState, neo.ClusterRecovering))
// TODO create C; C tries connect to master - rejected ("not yet operational")
// S starts listening
g.Expect(netlisten("s:1"))
ts.Expect(netlisten("s:1"))
// S connects M
g.Expect(netconnect("s:2", "m:2", "m:1"))
......
......@@ -18,7 +18,7 @@
// See https://www.nexedi.com/licensing for rationale and options.
// Package tsync provides infrastructure for synchronous testing based on program tracing.
// XXX naming -> ttest?
// XXX naming -> ttest? tracetest?
//
// A serial system can be verified by checking that its execution produces
// expected serial stream of events. But concurrent systems cannot be verified
......@@ -52,8 +52,12 @@
package tsync
import (
"fmt"
"sort"
"strings"
"reflect"
"testing"
"time"
"github.com/kylelemons/godebug/pretty"
)
......@@ -67,13 +71,19 @@ import (
// It is safe to use SyncChan from multiple goroutines simultaneously.
type SyncChan struct {
msgq chan *SyncMsg
name string
}
// Send sends event to a consumer and waits for ack.
//
// if main testing goroutine detects any problem Send panics. XXX
func (ch *SyncChan) Send(event interface{}) {
ack := make(chan struct{})
ack := make(chan bool)
ch.msgq <- &SyncMsg{event, ack}
<-ack
ok := <-ack
if !ok {
panic(fmt.Sprintf("%s: send: deadlock", ch.name))
}
}
// Recv receives message from a producer.
......@@ -89,17 +99,17 @@ func (ch *SyncChan) Recv() *SyncMsg {
// The goroutine which sent the message will wait for Ack before continue.
type SyncMsg struct {
Event interface {}
ack chan<- struct{}
ack chan<- bool
}
// Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *SyncMsg) Ack() {
close(m.ack)
m.ack <- true
}
// NewSyncChan creates new SyncChan channel.
func NewSyncChan() *SyncChan {
return &SyncChan{msgq: make(chan *SyncMsg)}
func NewSyncChan(name string) *SyncChan {
return &SyncChan{msgq: make(chan *SyncMsg), name: name}
}
......@@ -107,25 +117,36 @@ func NewSyncChan() *SyncChan {
// EventChecker is testing utility to verify that sequence of events coming
// from a single SyncChan are as expected.
// from a single SyncChan is as expected.
type EventChecker struct {
t testing.TB
in *SyncChan
dispatch *EventDispatcher
}
// NewEventChecker constructs new EventChecker that will retrieve events from
// `in` and use `t` for tests reporting.
func NewEventChecker(t testing.TB, in *SyncChan) *EventChecker {
return &EventChecker{t: t, in: in}
//
// XXX -> dispatch.NewChecker() ?
func NewEventChecker(t testing.TB, dispatch *EventDispatcher, in *SyncChan) *EventChecker {
return &EventChecker{t: t, in: in, dispatch: dispatch}
}
// get1 gets 1 event in place and checks it has expected type
//
// if checks do not pass - fatal testing error is raised
// XXX why eventp, not just event here?
func (evc *EventChecker) xget1(eventp interface{}) *SyncMsg {
evc.t.Helper()
// XXX handle deadlock timeout
msg := evc.in.Recv()
var msg *SyncMsg
select {
case msg = <-evc.in.msgq: // unwrapped Recv
// ok
case <-time.After(2*time.Second): // XXX timeout hardcoded
evc.deadlock(eventp)
}
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
......@@ -184,6 +205,54 @@ func (evc *EventChecker) ExpectNoACK(expected interface{}) *SyncMsg {
return msg
}
// deadlock reports diagnostic when retrieving event from .in timed out.
//
// timing out on recv means there is a deadlock either if no event was sent at
// all, or some other event was sent to another channel/checker.
//
// report the full picture - what was expected and what was sent where.
func (evc *EventChecker) deadlock(eventp interface{}) {
evc.t.Helper()
rt := evc.dispatch.rt
dstv := rt.AllRoutes()
bad := fmt.Sprintf("%s: deadlock waiting for %T\n", evc.in.name, eventp)
type sendInfo struct{dst *SyncChan; event interface{}}
var sendv []sendInfo
for _, dst := range dstv {
// check whether someone is sending on a dst without blocking.
// if yes - report to sender there is a problem - so it can cancel its task.
select {
case msg := <-dst.msgq:
sendv = append(sendv, sendInfo{dst, msg.Event})
//msg.ack <- false
default:
}
// in any case close channel where futer Sends may arrive so that will panic too.
close(dst.msgq)
}
// order channels by name
sort.Slice(sendv, func(i, j int) bool {
return strings.Compare(sendv[i].dst.name, sendv[j].dst.name) < 0
})
if len(sendv) == 0 {
bad += fmt.Sprintf("noone is sending\n")
} else {
bad += fmt.Sprintf("there are %d sender(s) on other channels:\n", len(sendv))
for _, __ := range sendv {
bad += fmt.Sprintf("%s:\t%T %v\n", __.dst.name, __.event, __.event)
}
}
evc.t.Fatal(bad)
}
// XXX goes away? (if there is no happens-before for events - just check them one by one in dedicated goroutines ?)
/*
// ExpectPar asks checker to expect next series of events to be from eventExpectV in no particular order
......@@ -220,15 +289,14 @@ loop:
// ----------------------------------------
// EventRouter is the interface used for routing events to appropriate output SyncChan.
//
// It should be safe to use EventRouter from multiple goroutines simultaneously.
type EventRouter interface {
// Route should return appropriate destination for event.
//
// If nil is returned default destination is used. // XXX ok?
//
// It should be safe to call Route from multiple goroutines simultaneously.
Route(event interface{}) *SyncChan
// AllDst() []*SyncChan
// AllRoutes should return all routing destinations.
AllRoutes() []*SyncChan
}
// EventDispatcher dispatches events to appropriate SyncChan for checking
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment