Commit 42ff83d9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f8859a39
...@@ -195,6 +195,8 @@ const ( ...@@ -195,6 +195,8 @@ const (
delWatch watchOp = 1 delWatch watchOp = 1
) )
// watcher dispatches events from driver to subscribers and serves
// {Add,Del}Watch requests.
func (s *storage) watcher() { func (s *storage) watcher() {
for { for {
select { select {
......
...@@ -54,22 +54,24 @@ func TestWatch(t *testing.T) { ...@@ -54,22 +54,24 @@ func TestWatch(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
obj := func(oid zodb.Oid, data string) xtesting.ZRawObject {
return xtesting.ZRawObject{oid, []byte(data)}
}
xcommit(obj(0, "data0"))
xcommit(xtesting.ZRawObject{0, []byte("data0")}) // open tfs at go side
stor, err := zodb.OpenStorage(context.Background(), tfs, &zodb.OpenOptions{ReadOnly: true}); X(err)
// spawn plain and verbose watchers // spawn plain and verbose watchers
ctx0, cancel := context.WithCancel(context.Background()) ctx0, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx0) wg, ctx := errgroup.WithContext(ctx0)
// gowatch spawns Watch(verbose) and returns expect(line) func that is // gowatch spawns Watch(verbose) and returns expectf() func that is
// connected to Watch output. // connected to verify Watch output.
gowatch := func(verbose bool) /*expectf*/func(format string, argv ...interface{}) { gowatch := func(verbose bool) /*expectf*/func(format string, argv ...interface{}) {
pr, pw := io.Pipe() pr, pw := io.Pipe()
wg.Go(func() error { wg.Go(func() error {
stor, err := zodb.OpenStorage(ctx, tfs, &zodb.OpenOptions{ReadOnly: true})
if err != nil {
return err
}
return Watch(ctx, stor, pw, verbose) return Watch(ctx, stor, pw, verbose)
}) })
...@@ -92,17 +94,20 @@ func TestWatch(t *testing.T) { ...@@ -92,17 +94,20 @@ func TestWatch(t *testing.T) {
pexpect := gowatch(false) pexpect := gowatch(false)
vexpect := gowatch(true) vexpect := gowatch(true)
// initial header
pexpect("# at %s", at) pexpect("# at %s", at)
vexpect("# at %s", at) vexpect("# at %s", at)
xcommit(xtesting.ZRawObject{0, []byte("data01")}) // commit -> output
xcommit(obj(0, "data01"))
pexpect("txn %s", at) pexpect("txn %s", at)
vexpect("txn %s", at) vexpect("txn %s", at)
vexpect("obj 0000000000000000") vexpect("obj 0000000000000000")
vexpect("") vexpect("")
xcommit(xtesting.ZRawObject{1, []byte("data1")}, xtesting.ZRawObject{2, []byte("data2")}) // commit -> output
xcommit(obj(1, "data1"), obj(2, "data2"))
pexpect("txn %s", at) pexpect("txn %s", at)
vexpect("txn %s", at) vexpect("txn %s", at)
...@@ -117,4 +122,12 @@ func TestWatch(t *testing.T) { ...@@ -117,4 +122,12 @@ func TestWatch(t *testing.T) {
if ecause != context.Canceled { if ecause != context.Canceled {
t.Fatalf("finished: err: expected 'canceled' cause; got %q", err) t.Fatalf("finished: err: expected 'canceled' cause; got %q", err)
} }
// commit after watchers canceled - storage must be alive
// (this verifies DelWatch)
// XXX move -> zodb tests
xcommit(obj(3, "data3"))
println("AAA")
stor.DelWatch(nil)
println("BBB")
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment