Commit fc2f4a1a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c5f6dc77
...@@ -41,7 +41,7 @@ import ( ...@@ -41,7 +41,7 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xtracing/tsync" "lab.nexedi.com/kirr/neo/go/xcommon/xtracing/tracetest"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/tracing" "lab.nexedi.com/kirr/go123/tracing"
...@@ -116,14 +116,14 @@ func masterStartReady(where string, ready bool) *eventMStartReady { ...@@ -116,14 +116,14 @@ func masterStartReady(where string, ready bool) *eventMStartReady {
type EventRouter struct { type EventRouter struct {
mu sync.Mutex mu sync.Mutex
defaultq *tsync.SyncChan defaultq *tracetest.SyncChan
// events specific to particular node - e.g. node starts listening, // events specific to particular node - e.g. node starts listening,
// state on that node changes, etc... // state on that node changes, etc...
byNode map[string /*host*/]*tsync.SyncChan byNode map[string /*host*/]*tracetest.SyncChan
// state on host changes. Takes precendece over byNode. // state on host changes. Takes precendece over byNode.
byState map[string /*host*/]*tsync.SyncChan byState map[string /*host*/]*tracetest.SyncChan
// event on a-b link // event on a-b link
byLink map[string /*host-host*/]*linkDst byLink map[string /*host-host*/]*linkDst
...@@ -133,16 +133,16 @@ type EventRouter struct { ...@@ -133,16 +133,16 @@ type EventRouter struct {
func NewEventRouter() *EventRouter { func NewEventRouter() *EventRouter {
return &EventRouter{ return &EventRouter{
defaultq: tsync.NewSyncChan("default"), defaultq: tracetest.NewSyncChan("default"),
byNode: make(map[string]*tsync.SyncChan), byNode: make(map[string]*tracetest.SyncChan),
byState: make(map[string]*tsync.SyncChan), byState: make(map[string]*tracetest.SyncChan),
byLink: make(map[string]*linkDst), byLink: make(map[string]*linkDst),
connected: make(map[string]bool), connected: make(map[string]bool),
} }
} }
func (r *EventRouter) AllRoutes() []*tsync.SyncChan { func (r *EventRouter) AllRoutes() []*tracetest.SyncChan {
rtset := map[*tsync.SyncChan]int{} rtset := map[*tracetest.SyncChan]int{}
rtset[r.defaultq] = 1 rtset[r.defaultq] = 1
for _, dst := range r.byNode { for _, dst := range r.byNode {
rtset[dst] = 1 rtset[dst] = 1
...@@ -155,7 +155,7 @@ func (r *EventRouter) AllRoutes() []*tsync.SyncChan { ...@@ -155,7 +155,7 @@ func (r *EventRouter) AllRoutes() []*tsync.SyncChan {
rtset[ldst.b] = 1 rtset[ldst.b] = 1
} }
var rtv []*tsync.SyncChan var rtv []*tracetest.SyncChan
for dst := range rtset { for dst := range rtset {
rtv = append(rtv, dst) rtv = append(rtv, dst)
} }
...@@ -184,7 +184,7 @@ func host(addr string) string { ...@@ -184,7 +184,7 @@ func host(addr string) string {
} }
// Route routes events according to rules specified via Branch*() // Route routes events according to rules specified via Branch*()
func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) { func (r *EventRouter) Route(event interface{}) (dst *tracetest.SyncChan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -258,7 +258,7 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) { ...@@ -258,7 +258,7 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) {
} }
// routeState routes event corresponding to state change on host // routeState routes event corresponding to state change on host
func (r *EventRouter) routeState(host string) (dst *tsync.SyncChan) { func (r *EventRouter) routeState(host string) (dst *tracetest.SyncChan) {
// lookup dst by state rules // lookup dst by state rules
dst = r.byState[host] dst = r.byState[host]
if dst != nil { if dst != nil {
...@@ -270,7 +270,7 @@ func (r *EventRouter) routeState(host string) (dst *tsync.SyncChan) { ...@@ -270,7 +270,7 @@ func (r *EventRouter) routeState(host string) (dst *tsync.SyncChan) {
} }
// BranchNode branches events corresponding to host. // BranchNode branches events corresponding to host.
func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) { func (r *EventRouter) BranchNode(host string, dst *tracetest.SyncChan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -282,7 +282,7 @@ func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) { ...@@ -282,7 +282,7 @@ func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) {
} }
// BranchState branches events corresponding to state changes on host. // BranchState branches events corresponding to state changes on host.
func (r *EventRouter) BranchState(host string, dst *tsync.SyncChan) { func (r *EventRouter) BranchState(host string, dst *tracetest.SyncChan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -299,7 +299,7 @@ func (r *EventRouter) BranchState(host string, dst *tsync.SyncChan) { ...@@ -299,7 +299,7 @@ func (r *EventRouter) BranchState(host string, dst *tsync.SyncChan) {
// //
// Event with networking cause root coming from a go to dsta, and with // Event with networking cause root coming from a go to dsta, and with
// networking cause root coming from b - go to dstb. // networking cause root coming from b - go to dstb.
func (r *EventRouter) BranchLink(link string, dsta, dstb *tsync.SyncChan) { func (r *EventRouter) BranchLink(link string, dsta, dstb *tracetest.SyncChan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -313,8 +313,8 @@ func (r *EventRouter) BranchLink(link string, dsta, dstb *tsync.SyncChan) { ...@@ -313,8 +313,8 @@ func (r *EventRouter) BranchLink(link string, dsta, dstb *tsync.SyncChan) {
} }
type linkDst struct { type linkDst struct {
a *tsync.SyncChan // net cause was on dialer a *tracetest.SyncChan // net cause was on dialer
b *tsync.SyncChan // net cause was on listener b *tracetest.SyncChan // net cause was on listener
} }
// ---- trace probes, etc -> events -> dispatcher ---- // ---- trace probes, etc -> events -> dispatcher ----
...@@ -322,14 +322,14 @@ type linkDst struct { ...@@ -322,14 +322,14 @@ type linkDst struct {
// TraceCollector connects to NEO-specific trace points via probes and sends events to dispatcher. // TraceCollector connects to NEO-specific trace points via probes and sends events to dispatcher.
type TraceCollector struct { type TraceCollector struct {
pg *tracing.ProbeGroup pg *tracing.ProbeGroup
d *tsync.EventDispatcher d interface { Dispatch(interface{}) }
node2Name map[*NodeApp]string node2Name map[*NodeApp]string
nodeTab2Owner map[*NodeTable]string nodeTab2Owner map[*NodeTable]string
clusterState2Owner map[*proto.ClusterState]string clusterState2Owner map[*proto.ClusterState]string
} }
func NewTraceCollector(dispatch *tsync.EventDispatcher) *TraceCollector { func NewTraceCollector(dispatch interface { Dispatch(interface{}) }) *TraceCollector {
return &TraceCollector{ return &TraceCollector{
pg: &tracing.ProbeGroup{}, pg: &tracing.ProbeGroup{},
d: dispatch, d: dispatch,
...@@ -415,7 +415,7 @@ func (t *TraceCollector) traceMasterStartReady(m *Master, ready bool) { ...@@ -415,7 +415,7 @@ func (t *TraceCollector) traceMasterStartReady(m *Master, ready bool) {
// M drives cluster with 1 S & C through recovery -> verification -> service -> shutdown // M drives cluster with 1 S & C through recovery -> verification -> service -> shutdown
func TestMasterStorage(t *testing.T) { func TestMasterStorage(t *testing.T) {
rt := NewEventRouter() rt := NewEventRouter()
dispatch := tsync.NewEventDispatcher(rt) dispatch := tracetest.NewEventDispatcher(rt)
tracer := NewTraceCollector(dispatch) tracer := NewTraceCollector(dispatch)
net := pipenet.New("testnet") // test network net := pipenet.New("testnet") // test network
...@@ -423,11 +423,6 @@ func TestMasterStorage(t *testing.T) { ...@@ -423,11 +423,6 @@ func TestMasterStorage(t *testing.T) {
tracer.Attach() tracer.Attach()
defer tracer.Detach() defer tracer.Detach()
// by default events go to g
//g := tsync.NewEventChecker(t, dispatch, rt.defaultq)
// shortcut for addresses // shortcut for addresses
xaddr := func(addr string) *pipenet.Addr { xaddr := func(addr string) *pipenet.Addr {
a, err := net.ParseAddr(addr) a, err := net.ParseAddr(addr)
...@@ -483,23 +478,23 @@ func TestMasterStorage(t *testing.T) { ...@@ -483,23 +478,23 @@ func TestMasterStorage(t *testing.T) {
Shost := xnet.NetTrace(net.Host("s"), tracer) Shost := xnet.NetTrace(net.Host("s"), tracer)
Chost := xnet.NetTrace(net.Host("c"), tracer) Chost := xnet.NetTrace(net.Host("c"), tracer)
cM := tsync.NewSyncChan("m.main") // trace of events local to M cM := tracetest.NewSyncChan("m.main") // trace of events local to M
cS := tsync.NewSyncChan("s.main") // trace of events local to S XXX with cause root also on S cS := tracetest.NewSyncChan("s.main") // trace of events local to S XXX with cause root also on S
// cC := tsync.NewSyncChan("c.main") // cC := tracetest.NewSyncChan("c.main")
cMS := tsync.NewSyncChan("m-s") // trace of events with cause root being m -> s send cMS := tracetest.NewSyncChan("m-s") // trace of events with cause root being m -> s send
cSM := tsync.NewSyncChan("s-m") // trace of events with cause root being s -> m send cSM := tracetest.NewSyncChan("s-m") // trace of events with cause root being s -> m send
cMC := tsync.NewSyncChan("m-c") // ----//---- m -> c cMC := tracetest.NewSyncChan("m-c") // ----//---- m -> c
cCM := tsync.NewSyncChan("c-m") // ----//---- c -> m cCM := tracetest.NewSyncChan("c-m") // ----//---- c -> m
cCS := tsync.NewSyncChan("c-s") // ----//---- c -> s cCS := tracetest.NewSyncChan("c-s") // ----//---- c -> s
tM := tsync.NewEventChecker(t, dispatch, cM) tM := tracetest.NewEventChecker(t, dispatch, cM)
tS := tsync.NewEventChecker(t, dispatch, cS) tS := tracetest.NewEventChecker(t, dispatch, cS)
// tC := tsync.NewEventChecker(t, dispatch, cC) // XXX no need // tC := tracetest.NewEventChecker(t, dispatch, cC) // XXX no need
tMS := tsync.NewEventChecker(t, dispatch, cMS) tMS := tracetest.NewEventChecker(t, dispatch, cMS)
tSM := tsync.NewEventChecker(t, dispatch, cSM) tSM := tracetest.NewEventChecker(t, dispatch, cSM)
tMC := tsync.NewEventChecker(t, dispatch, cMC) tMC := tracetest.NewEventChecker(t, dispatch, cMC)
tCM := tsync.NewEventChecker(t, dispatch, cCM) tCM := tracetest.NewEventChecker(t, dispatch, cCM)
tCS := tsync.NewEventChecker(t, dispatch, cCS) tCS := tracetest.NewEventChecker(t, dispatch, cCS)
rt.BranchNode("m", cM) rt.BranchNode("m", cM)
...@@ -910,7 +905,18 @@ func TestMasterStorage(t *testing.T) { ...@@ -910,7 +905,18 @@ func TestMasterStorage(t *testing.T) {
} }
/* // dispatch1 dispatched directly to single output channel
//
// XXX hack - better we don't need it.
// XXX -> with testenv.MkCluster() we won't need it
type tdispatch1 struct {
outch *tracetest.SyncChan
}
func (d tdispatch1) Dispatch(event interface{}) {
d.outch.Send(event)
}
func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) { func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) {
// create test cluster <- XXX factor to utility func // create test cluster <- XXX factor to utility func
zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs") zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs")
...@@ -924,12 +930,13 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f ...@@ -924,12 +930,13 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
M := NewMaster("abc1", "", Mnet) M := NewMaster("abc1", "", Mnet)
// XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api? // XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
tracer := &TraceRouter{tsync.NewSyncChan()} cG := tracetest.NewSyncChan("main")
tc := tsync.NewEventChecker(b, tracer.SyncChan) tG := tracetest.NewEventChecker(b, nil /* XXX */, cG)
pg := &tracing.ProbeGroup{}
tracer := NewTraceCollector(tdispatch1{cG})
tracing.Lock() tracing.Lock()
pnode := traceNodeChanged_Attach(nil, tracer.traceNode) pnode := traceNodeChanged_Attach(nil, tracer.traceNode)
traceMasterStartReady_Attach(pg, tracer.traceMasterStartReady) traceMasterStartReady_Attach(tracer.pg, tracer.traceMasterStartReady)
tracing.Unlock() tracing.Unlock()
wg.Go(func() error { wg.Go(func() error {
...@@ -937,7 +944,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f ...@@ -937,7 +944,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
}) })
// determing M serving address XXX better with M api // determing M serving address XXX better with M api
ev := tracer.Recv() ev := cG.Recv()
mnode, ok := ev.Event.(*eventNodeTab) mnode, ok := ev.Event.(*eventNodeTab)
if !ok { if !ok {
b.Fatal("after M start: got %T ; want eventNodeTab", ev.Event) b.Fatal("after M start: got %T ; want eventNodeTab", ev.Event)
...@@ -951,15 +958,15 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f ...@@ -951,15 +958,15 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
// now after we know Maddr create S & C and start S serving // now after we know Maddr create S & C and start S serving
S := NewStorage("abc1", Maddr, "", Snet, zstor) S := NewStorage("abc1", Maddr, "", Snet, zstor)
C := client.NewClient("abc1", Maddr, Cnet) C := NewClient("abc1", Maddr, Cnet)
wg.Go(func() error { wg.Go(func() error {
return S.Run(ctx) return S.Run(ctx)
}) })
// command M to start // command M to start
tc.Expect(masterStartReady(M, true)) // <- XXX better with M api tG.Expect(masterStartReady("m", true)) // <- XXX better with M api
pg.Done() tracer.Detach()
err := M.Start() err := M.Start()
if err != nil { if err != nil {
...@@ -1043,4 +1050,3 @@ func BenchmarkGetObjectTCPloParallel(b *testing.B) { ...@@ -1043,4 +1050,3 @@ func BenchmarkGetObjectTCPloParallel(b *testing.B) {
net := xnet.NetPlain("tcp") net := xnet.NetPlain("tcp")
benchmarkGetObjectParallel(b, net, net, net) benchmarkGetObjectParallel(b, net, net, net)
} }
*/
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
// Package tsync provides infrastructure for synchronous testing based on program tracing. // Package tracetest provides infrastructure for testing concurrent systems
// XXX naming -> ttest? tracetest? synctest? // based on synchronous events tracing.
// //
// A serial system can be verified by checking that its execution produces // A serial system can be verified by checking that its execution produces
// expected serial stream of events. But concurrent systems cannot be verified // expected serial stream of events. But concurrent systems cannot be verified
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
// However in a concurrent system one can decompose all events into serial // However in a concurrent system one can decompose all events into serial
// streams in which events are strictly ordered by causality with respect to // streams in which events are strictly ordered by causality with respect to
// each other. This decomposition in turn allows to verify that in every stream // each other. This decomposition in turn allows to verify that in every stream
// events were as expected. // events happenned as expected.
// //
// Verification of events for all streams can be done by one *sequential* // Verification of events for all streams can be done by one *sequential*
// process: // process:
...@@ -44,12 +44,37 @@ ...@@ -44,12 +44,37 @@
// causality (i.e. there is some happens-before relation for them) the // causality (i.e. there is some happens-before relation for them) the
// sequence of checking should represent that ordering relation. // sequence of checking should represent that ordering relation.
// //
// The package should be used as follows:
//
// - implement tracer that will be synchronously collecting events from
// execution of your program. This can be done with package
// lab.nexedi.com/kirr/go123/tracing or by other similar means.
//
// the tracer have to output events to dispatcher (see below).
//
// - implement router that will be making decisions specific to your
// particular testing scenario on to which stream an event should belong.
//
// the router will be consulted by dispatcher (see below) for its working.
//
// - create Dispatcher. This is the central place where events are
// delivered from tracer and are further delivered in accordance to what
// router says.
//
// - for every serial stream of events create synchronous delivery channel
// (SyncChan) and event Checker. XXX
//
//
// XXX more text describing how to use the package. // XXX more text describing how to use the package.
// //
//
// XXX say that checker will detect deadlock if there is no event or it or
// another comes at different trace channel.
//
// XXX (if tested system is serial only there is no need to use Dispatcher and // XXX (if tested system is serial only there is no need to use Dispatcher and
// routing - the collector can send output directly to the only SyncChan with // routing - the collector can send output directly to the only SyncChan with
// only one EventChecker connected to it). // only one EventChecker connected to it).
package tsync package tracetest
import ( import (
"fmt" "fmt"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment