Commit 0853555b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2ec2288e
...@@ -106,7 +106,7 @@ func TestTracetestExample(t *testing.T) { ...@@ -106,7 +106,7 @@ func TestTracetestExample(t *testing.T) {
}() }()
// assert that events come as expected // assert that events come as expected
t.Expect("t3", eventHi("T1·C")) t.Expect("t2", eventHi("T1·C"))
t.Expect("t1", eventHi("T1·A")) t.Expect("t1", eventHi("T1·A"))
t.Expect("t1", eventHello("T1·B")) t.Expect("t1", eventHello("T1·B"))
......
...@@ -120,6 +120,9 @@ type ChanTx interface { ...@@ -120,6 +120,9 @@ type ChanTx interface {
// Send sends event to a consumer and waits for ack. // Send sends event to a consumer and waits for ack.
// if main testing goroutine detects any problem Send panics. XXX // if main testing goroutine detects any problem Send panics. XXX
Send(event interface{}) Send(event interface{})
// Close closes the sending side of the channel.
Close()
} }
// ChanRx represents "receive-only" half of Chan. // ChanRx represents "receive-only" half of Chan.
...@@ -140,7 +143,7 @@ type ChanRx interface { ...@@ -140,7 +143,7 @@ type ChanRx interface {
// The goroutine which sent the message will wait for Ack before continue. // The goroutine which sent the message will wait for Ack before continue.
type Msg struct { type Msg struct {
Event interface {} Event interface {}
ack chan<- error // nil on Ack; !nil on Nak ack chan<- error // nil on Ack; !nil on nak
} }
...@@ -170,6 +173,11 @@ func (ch *_chan) Send(event interface{}) { ...@@ -170,6 +173,11 @@ func (ch *_chan) Send(event interface{}) {
} }
} }
// Close implements ChanTx.
func (ch *_chan) Close() {
close(ch.msgq)
}
// Recv implements ChanRx. // Recv implements ChanRx.
func (ch *_chan) Recv() *Msg { func (ch *_chan) Recv() *Msg {
msg := <-ch.msgq msg := <-ch.msgq
...@@ -182,13 +190,13 @@ func (ch *_chan) _rxq() <-chan *Msg { ...@@ -182,13 +190,13 @@ func (ch *_chan) _rxq() <-chan *Msg {
} }
// XXX -> Unpause? Cont? Continue? // XXX -> Unpause? Cont? Continue?
// XXX Ack(err)? or add Nak(err)?
// Ack acknowledges the event was processed and unblocks producer goroutine. // Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *Msg) Ack() { func (m *Msg) Ack() {
m.ack <- nil m.ack <- nil
} }
func (m *Msg) Nak(why string) { // XXX it should be called only by tracetest internals from under Fatal
func (m *Msg) nak(why string) {
m.ack <- errors.New(why) m.ack <- errors.New(why)
} }
...@@ -577,8 +585,8 @@ func (t *T) expect1(stream string, eventExpect interface{}) *Msg { ...@@ -577,8 +585,8 @@ func (t *T) expect1(stream string, eventExpect interface{}) *Msg {
revent := reventp.Elem() revent := reventp.Elem()
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) { if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
msg.Nak("expect failed") //msg.Nak("expect failed") // XXX merge into Fatal
t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff: %s", t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff:\n%s",
stream, stream,
reventExpect.Type(), reventExpect, revent, reventExpect.Type(), reventExpect, revent,
pretty.Compare(reventExpect.Interface(), revent.Interface())) pretty.Compare(reventExpect.Interface(), revent.Interface()))
...@@ -606,12 +614,13 @@ func (t *T) xget1(stream string, eventp interface{}) *Msg { ...@@ -606,12 +614,13 @@ func (t *T) xget1(stream string, eventp interface{}) *Msg {
// ok // ok
case <-time.After(*deadTime): case <-time.After(*deadTime):
t.deadlock(stream, eventp) //t.deadlock(stream, eventp)
t.Fatalf("%s: deadlock waiting for %T\n", stream, eventp)
} }
reventp := reflect.ValueOf(eventp) reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) { if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
// msg.nak <- error msg.nak("test failed") // XXX message ok?
t.Fatalf("%s: expect: %s: got %#v", ch.name(), reventp.Elem().Type(), msg.Event) t.Fatalf("%s: expect: %s: got %#v", ch.name(), reventp.Elem().Type(), msg.Event)
} }
...@@ -675,7 +684,7 @@ func (t *T) deadlock(stream string, eventp interface{}) { ...@@ -675,7 +684,7 @@ func (t *T) deadlock(stream string, eventp interface{}) {
// comes first, and their "panics" don't get intermixed with it. // comes first, and their "panics" don't get intermixed with it.
t.Log(bad) t.Log(bad)
for _, __ := range sendv { for _, __ := range sendv {
__.msg.Nak("deadlock") __.msg.nak("deadlock")
} }
t.FailNow() t.FailNow()
} }
...@@ -702,6 +711,69 @@ func (t *T) fatalfInNonMain(format string, argv ...interface{}) { ...@@ -702,6 +711,69 @@ func (t *T) fatalfInNonMain(format string, argv ...interface{}) {
runtime.Goexit() runtime.Goexit()
} }
// T overrides FailNow/Fatal/Fatalf to also nak all in-progress sends.
func (t *T) FailNow() {
t.Helper()
// mark streamTab no longer operational XXX ok?
t.streamTabMu.Lock()
streamTab := t.streamTab
t.streamTab = nil
t.streamTabMu.Unlock()
type sendInfo struct{ch Chan; msg *Msg}
var sendv []sendInfo
for _, ch := range streamTab {
// check whether someone is sending on a dst without blocking.
// if yes - report to sender there is a problem - so it can cancel its task.
select {
case msg := <-ch._rxq():
sendv = append(sendv, sendInfo{ch, msg})
default:
}
// in any case close channel where future Sends may arrive so that will panic too.
ch.Close()
}
// order channels by name
sort.Slice(sendv, func(i, j int) bool {
return strings.Compare(sendv[i].ch.name(), sendv[j].ch.name()) < 0
})
bad := ""
if len(sendv) == 0 {
bad += fmt.Sprintf("noone is sending\n")
} else {
bad += fmt.Sprintf("there are %d pending sender(s) on other channel(s):\n", len(sendv))
for _, __ := range sendv {
bad += fmt.Sprintf("%s:\t%T %v\n", __.ch.name(), __.msg.Event, __.msg.Event)
}
}
// log the deadlock details and nak all senders.
// nak them only after deadlock printout, so that the deadlock text
// comes first, and their "panics" don't get intermixed with it.
t.Log(bad)
for _, __ := range sendv {
__.msg.nak("XXX deadlock") // XXX reason can be custom / different
}
t.TB.FailNow()
}
func (t *T) Fatal(argv ...interface{}) {
t.Helper()
t.Log(argv...)
t.FailNow()
}
func (t *T) Fatalf(format string, argv ...interface{}) {
t.Helper()
t.Logf(format, argv...)
t.FailNow()
}
// TODO func Verify(system) // TODO func Verify(system)
...@@ -719,11 +791,16 @@ func Verify(t testing.TB, testf func(t *T)) { ...@@ -719,11 +791,16 @@ func Verify(t testing.TB, testf func(t *T)) {
tT := &T{TB: t, streamTab: make(map[string]Chan)} tT := &T{TB: t, streamTab: make(map[string]Chan)}
// XXX // XXX
testf(tT) // XXX not here
/*
// verify in the end that no events are left unchecked / unconsumed
// (e.g. sent to RxEvent, but not received). Nak them if they are ... XXX
defer func() {
}()
*/
testf(tT)
// XXX in the end: verify that no events are left unchecked /
// unconsumed (e.g. sent to Dispatch, but not received).
// XXX in the end: verify that streams are the same from run to run (if completed successfully). // XXX in the end: verify that streams are the same from run to run (if completed successfully).
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment