Commit a676063e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent fca52539
...@@ -44,33 +44,33 @@ ...@@ -44,33 +44,33 @@
// causality (i.e. there is some happens-before relation for them) the // causality (i.e. there is some happens-before relation for them) the
// sequence of checking should represent that ordering relation. // sequence of checking should represent that ordering relation.
// //
// The package should be used as follows: XXX rework // Basic package usage is as follows:
// //
// XXX tracer -> trace collector? // func TestSomething(t *testing.T) {
// - implement tracer that will be synchronously collecting events from // tracetest.Verify(t, func(t *tracetest.T) {
// execution of your program. This can be done with package // // setup tracing so that events of test system are collected and
// lab.nexedi.com/kirr/go123/tracing or by other similar means. // // synchronously delivered to t.RxEvent. This can be done with
// // e.g. package lab.nexedi.com/kirr/go123/tracing or by other similar means.
// ...
// //
// the tracer have to output events to dispatcher (see below). // // tell t to which stream an event should go.
// t.SetEventRouter(...)
// //
// - implement router that will be making decisions specific to your XXX -> Streams // // run the system and verify it produces expected events
// particular testing scenario on to which stream an event should belong.
// //
// the router will be consulted by dispatcher (see below) for its working. // // <code to start the system>
// t.Expect("<stream₁>", eventOk₁)
// t.Expect("<stream₂>", eventOk₂)
// ...
// //
// - create Dispatcher. This is the central place where events are // // <code to further control/affect the system>
// delivered from tracer and are further delivered in accordance to what // t.Expect("<stream₃>", eventOk₃)
// router says. // t.Expect("<stream₄>", eventOk₄)
// ...
// })
// }
// //
// - for every serial stream of events create synchronous delivery channel // See example_test.go for more details.
// (Chan) and event Checker. XXX
//
//
// XXX more text describing how to use the package.
// XXX link to example.
//
//
// XXX say that deadlocks are detected?
package tracetest package tracetest
import ( import (
...@@ -124,17 +124,17 @@ type _ChanRx interface { ...@@ -124,17 +124,17 @@ type _ChanRx interface {
// Recv receives message from a producer. // Recv receives message from a producer.
// //
// The consumer, after dealing with the message, must send back an ack. // The consumer, after dealing with the message, must send back an ack.
Recv() *Msg Recv() *_Msg
// _rxq returns raw channel that is serving _ChanRx. // _rxq returns raw channel that is serving _ChanRx.
// it is used internally to use _ChanRx in select. // it is used internally to use _ChanRx in select.
_rxq() <-chan *Msg _rxq() <-chan *_Msg
} }
// Msg represents message with 1 event sent over _Chan. // _Msg represents message with 1 event sent over _Chan.
// //
// The goroutine which sent the message will wait for Ack before continue. // The goroutine which sent the message will wait for Ack before continue.
type Msg struct { type _Msg struct {
Event interface {} Event interface {}
ack chan<- error // nil on Ack; !nil on nak ack chan<- error // nil on Ack; !nil on nak
} }
...@@ -144,7 +144,7 @@ type Msg struct { ...@@ -144,7 +144,7 @@ type Msg struct {
// _chan implements _Chan. // _chan implements _Chan.
type _chan struct { type _chan struct {
t *T t *T
msgq chan *Msg msgq chan *_Msg
down chan struct{} // becomes ready when closed down chan struct{} // becomes ready when closed
_name string _name string
} }
...@@ -164,7 +164,7 @@ func (ch *_chan) Send(event interface{}) { ...@@ -164,7 +164,7 @@ func (ch *_chan) Send(event interface{}) {
case <-ch.down: case <-ch.down:
ch.t.fatalfInNonMain("%s: send: channel was closed", ch.name()) ch.t.fatalfInNonMain("%s: send: channel was closed", ch.name())
case ch.msgq <- &Msg{event, ack}: case ch.msgq <- &_Msg{event, ack}:
err := <-ack err := <-ack
if err != nil { if err != nil {
ch.t.fatalfInNonMain("%s: send: %s", ch.name(), err) ch.t.fatalfInNonMain("%s: send: %s", ch.name(), err)
...@@ -178,24 +178,24 @@ func (ch *_chan) Close() { ...@@ -178,24 +178,24 @@ func (ch *_chan) Close() {
} }
// Recv implements _ChanRx. // Recv implements _ChanRx.
func (ch *_chan) Recv() *Msg { func (ch *_chan) Recv() *_Msg {
msg := <-ch.msgq msg := <-ch.msgq
return msg return msg
} }
// _rxq implements _ChanRx. // _rxq implements _ChanRx.
func (ch *_chan) _rxq() <-chan *Msg { func (ch *_chan) _rxq() <-chan *_Msg {
return ch.msgq return ch.msgq
} }
// XXX -> Unpause? Cont? Continue? // XXX -> Unpause? Cont? Continue?
// Ack acknowledges the event was processed and unblocks producer goroutine. // Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *Msg) Ack() { func (m *_Msg) Ack() {
m.ack <- nil m.ack <- nil
} }
// XXX it should be called only by tracetest internals from under Fatal // XXX it should be called only by tracetest internals from under Fatal
func (m *Msg) nak(why string) { func (m *_Msg) nak(why string) {
m.ack <- errors.New(why) m.ack <- errors.New(why)
} }
...@@ -203,7 +203,7 @@ func (m *Msg) nak(why string) { ...@@ -203,7 +203,7 @@ func (m *Msg) nak(why string) {
func _NewChan(name string) _Chan { func _NewChan(name string) _Chan {
// XXX somehow avoid channels with duplicate names // XXX somehow avoid channels with duplicate names
// (only allow to create named channels from under dispatcher?) // (only allow to create named channels from under dispatcher?)
return &_chan{msgq: make(chan *Msg), down: make(chan struct{}), _name: name} return &_chan{msgq: make(chan *_Msg), down: make(chan struct{}), _name: name}
} }
...@@ -243,12 +243,12 @@ type delayInjectState struct { ...@@ -243,12 +243,12 @@ type delayInjectState struct {
} }
type nak struct { type nak struct {
msg *Msg msg *_Msg
why string why string
} }
// XXX place; just use t.Cleanup instead? // XXX place; just use t.Cleanup instead?
func (t *T) queuenak(msg *Msg, why string) { func (t *T) queuenak(msg *_Msg, why string) {
t.nakq = append(t.nakq, nak{msg, why}) t.nakq = append(t.nakq, nak{msg, why})
} }
...@@ -362,7 +362,7 @@ func (t *T) Expect(stream string, eventOK interface{}) { ...@@ -362,7 +362,7 @@ func (t *T) Expect(stream string, eventOK interface{}) {
// expect1 receives next event on stream and verifies it to be equal to eventOK (both type and value). // expect1 receives next event on stream and verifies it to be equal to eventOK (both type and value).
// //
// if checks do not pass - fatal testing error is raised. // if checks do not pass - fatal testing error is raised.
func (t *T) expect1(stream string, eventExpect interface{}) *Msg { func (t *T) expect1(stream string, eventExpect interface{}) *_Msg {
t.Helper() t.Helper()
reventExpect := reflect.ValueOf(eventExpect) reventExpect := reflect.ValueOf(eventExpect)
...@@ -385,7 +385,7 @@ func (t *T) expect1(stream string, eventExpect interface{}) *Msg { ...@@ -385,7 +385,7 @@ func (t *T) expect1(stream string, eventExpect interface{}) *Msg {
// xget1 gets 1 event in place and checks it has expected type // xget1 gets 1 event in place and checks it has expected type
// //
// if checks do not pass - fatal testing error is raised // if checks do not pass - fatal testing error is raised
func (t *T) xget1(stream string, eventp interface{}) *Msg { func (t *T) xget1(stream string, eventp interface{}) *_Msg {
t.Helper() t.Helper()
t.mu.Lock() t.mu.Lock()
...@@ -394,7 +394,7 @@ func (t *T) xget1(stream string, eventp interface{}) *Msg { ...@@ -394,7 +394,7 @@ func (t *T) xget1(stream string, eventp interface{}) *Msg {
// XXX ch == nil -> no longer operational // XXX ch == nil -> no longer operational
var msg *Msg var msg *_Msg
select { select {
case msg = <-ch._rxq(): // unwrapped Recv case msg = <-ch._rxq(): // unwrapped Recv
...@@ -469,7 +469,7 @@ func (t *T) closeStreamTab() (nnak int) { ...@@ -469,7 +469,7 @@ func (t *T) closeStreamTab() (nnak int) {
} }
// print details about pending events and all streams // print details about pending events and all streams
type sendInfo struct{ch _Chan; msg *Msg} type sendInfo struct{ch _Chan; msg *_Msg}
var sendv []sendInfo // sends are pending here var sendv []sendInfo // sends are pending here
var quietv []_Chan // this channels are quiet var quietv []_Chan // this channels are quiet
...@@ -572,7 +572,7 @@ func Verify(t *testing.T, f func(t *T)) { ...@@ -572,7 +572,7 @@ func Verify(t *testing.T, f func(t *T)) {
} }
streams0 := streamsOfTrace(trace0) streams0 := streamsOfTrace(trace0)
// sort trace0 by time just in case - events migth come from multiple // sort trace0 by time just in case - events might come from multiple
// CPUs simultaneously, and so for close events they might be added to // CPUs simultaneously, and so for close events they might be added to
// tracev not in time order. // tracev not in time order.
sort.Slice(trace0, func(i, j int) bool { sort.Slice(trace0, func(i, j int) bool {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment