Commit 51e0d65d authored by Kirill Smelkov's avatar Kirill Smelkov

X fix demo not to deadlock δ while it is verified

ZEO and NEO are not yet adjusted to new "watchq vs Close" requirement.
parent 2888bbbc
......@@ -32,6 +32,7 @@ import (
......@@ -467,6 +468,11 @@ func DrvTestWatch(t *testing.T, zurl string, zdrvOpen zodb.DriverOpener, bugv ..
// commit something more and wait a bit to raise chances the driver enqueues to watchq<- .
_ = xcommit(at, ZRawObject{0, b("at the end")})
// the driver must handle Close and cancel that watchq<-
err = zdrv.Close(); X(err)
e, ok := <-watchq
// Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <>
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -46,12 +46,30 @@ type DriverOptions struct {
// Watchq can be nil to ignore such events. However if Watchq != nil, the events
// have to be consumed or else the storage driver will misbehave - e.g.
// it can get out of sync with the on-disk database file.
// it can get out of sync with the on-disk database file, or deadlock
// on any user-called operation.
// The storage driver closes !nil Watchq when the driver is closed.
// The storage driver will send only and all events in (at₀, +∞] range,
// where at₀ is at returned by driver open.
// The storage driver will stop sending events after call to Close.
// In particular the following example is valid and safe from deadlock:
// watchq := make(chan zodb.Event)
// stor, at0, err := zodb.OpenDriver(..., &DriverOptions{Watchq: watchq})
// defer stor.Close()
// for {
// select {
// case <-ctx.Done():
// return ctx.Err()
// case <-watchq:
// ...
// }
// }
Watchq chan<- Event
......@@ -73,7 +91,10 @@ func RegisterDriver(scheme string, opener DriverOpener) {
driverRegistry[scheme] = opener
// XXX ...
// OpenDriver opens ZODB storage driver by URL.
// It is similar to Open but returns low-level IStorageDriver instead of IStorage.
// Most users should use Open.
func OpenDriver(ctx context.Context, zurl string, opt *DriverOptions) (_ IStorageDriver, at0 Tid, _ error) {
// no scheme -> file://
if !strings.Contains(zurl, ":") {
......@@ -108,7 +129,7 @@ func OpenDriver(ctx context.Context, zurl string, opt *DriverOptions) (_ IStorag
// Users should import in storage packages they use or zodb/wks package to
// get support for well-known storages.
// Storage authors should register their storages with RegisterStorage.
// Storage authors should register their storages with RegisterDriver.
func Open(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, error) {
drvWatchq := make(chan Event)
drvOpt := &DriverOptions{
......@@ -369,7 +390,7 @@ func (s *storage) _watcher() error {
func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
ack := make(chan Tid)
select {
// no longer operational: behave if watchq was registered before that
// no longer operational: behave as if watchq was registered before that
// and then seen down/close events. Interact with DelWatch directly.
case <-s.down:
s.headMu.Lock() // shutdown may be due to Close call and watcher might be
......@@ -51,6 +51,7 @@ type Storage struct {
baseAt0 zodb.Tid
baseWatchq <-chan zodb.Event
δWatchq <-chan zodb.Event // nil if demo is opened with watchq=nil
δWatchq0 <-chan zodb.Event // buffer for δ events queued while δ was initially verified
watchq chan<- zodb.Event // user requested to deliver events here
watchWG *xsync.WorkGroup
......@@ -59,6 +60,9 @@ type Storage struct {
downOnce sync.Once
down chan struct{} // ready when storage is down
downErr error // reason for shutdown
closeOnce sync.Once
closed chan struct{} // ready when storage is Closed
// baseMutatedError is reported when Storage.base is detected to change.
......@@ -91,6 +95,13 @@ func (d *Storage) watcher(ctx context.Context) error {
for {
var δWatchq <-chan zodb.Event
if len(d.δWatchq0) != 0 {
δWatchq = d.δWatchq0
} else {
δWatchq = d.δWatchq
select {
// Close requests to stop watching
case <-ctx.Done():
......@@ -117,22 +128,33 @@ func (d *Storage) watcher(ctx context.Context) error {
ev := &zodb.EventError{&zodb.OpError{URL: d.URL(), Op: "watch", Err: edown}}
ev := &zodb.EventError{&zodb.OpError{URL: d.URL(), Op: "watcher", Err: edown}}
if d.watchq != nil {
d.watchq <- ev // XXX + select on close ?
select {
case <-d.closed:
// wakeup to return edown
case d.watchq <- ev:
// ok
return edown
// event on δ -> proxy to user
case event, ok := <-d.δWatchq:
case event, ok := <-δWatchq:
if !ok {
// δ closed
d.δWatchq = nil
// XXX +select on close
d.watchq <- event // !nil because d.δWatchq != nil
select {
case <-d.closed:
return nil
case d.watchq <- event: // !nil because d.δWatchq != nil
// ok
......@@ -156,6 +178,9 @@ func (d *Storage) Close() (err error) {
d.closeOnce.Do(func() {
errδ := d.δ.Close()
errBase := d.base.Close()
......@@ -343,34 +368,68 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// XXX read watchqs while we verify? (not to deadlock)
// verify that either
// - δ is all empty (just created), or
// - all δ transactions come strictly after base.
at0 := baseAt0
var δEventq0 []zodb.Event
if δAt0 != 0 {
if δAt0 < baseAt0 {
return nil, zodb.InvalidTid, fmt.Errorf("base is ahead of δ: base.head=%s δ.head=%s", baseAt0, δAt0)
:= δ.Iterate(ctx, 0, baseAt0)
for {
δtxni, _, err := .NextTxn(ctx)
if err == io.EOF {
break // all ok - nothing in δ
if err != nil {
return nil, zodb.InvalidTid, err
// read and queue data from δWatchq while we verify δ (not to deadlock δ driver)
δq0Stop := make(chan struct{}) // reader <- main "stop"
δq0Done := make(chan struct{}) // reader -> main "done"
go func() {
defer close(δq0Done)
for {
select {
case <-δq0Stop:
case δevent, ok := <-δWatchq:
if !ok {
δEventq0 = append(δEventq0, δevent)
// verify δ
:= δ.Iterate(ctx, 0, baseAt0)
δtxni, _, err := .NextTxn(ctx)
// TODO iδ.Close()
switch {
case err == io.EOF:
err = nil // ok - nothing in δ
case err == nil:
// there is a δ transaction ∈ [δAt0, baseAt0)
// TODO iδ.Close()
return nil, zodb.InvalidTid, fmt.Errorf("base overlaps with δ: base.head=%s δ.tail=%s", baseAt0, δtxni.Tid)
err = fmt.Errorf("base overlaps with δ: base.head=%s δ.tail=%s", baseAt0, δtxni.Tid)
if err != nil {
return nil, zodb.InvalidTid, err
at0 = δAt0
// requeue δWatchq0 <- δEventq0
var δWatchq0 chan zodb.Event
if l := len(δEventq0); l != 0 {
δWatchq0 = make(chan zodb.Event, l)
for _, ev := range δEventq0 {
δWatchq0 <- ev
d := &Storage{
base: base,
δ : δ,
......@@ -378,9 +437,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
baseAt0: baseAt0,
baseWatchq: baseWatchq,
δWatchq: δWatchq,
δWatchq0: δWatchq0,
watchq: opt.Watchq,
down: make(chan struct{}),
down: make(chan struct{}),
closed: make(chan struct{}),
// spawn watcher to listen on baseWatchq and shutdown storage if base changes.
......@@ -228,17 +228,20 @@ func TestWatchLoad_vs_BaseMutate(t *testing.T) {
tid, err := ddat.MutateBase(); X(err)
// first wait for error from watchq
event := <-watchq
event, ok := <-watchq
if !ok {
t.Fatal("after base mutate: premature watchq close")
evErr, ok := event.(*zodb.EventError)
if !ok {
t.Fatalf("unexpected event: %T", event)
t.Fatalf("after base mutate: unexpected event: %T", event)
errBaseMutated := &baseMutatedError{
baseAt0: 0,
baseHead: tid,
evErrOk := &zodb.EventError{&zodb.OpError{URL: ddrv.URL(), Op: "watch", Err: errBaseMutated}}
evErrOk := &zodb.EventError{&zodb.OpError{URL: ddrv.URL(), Op: "watcher", Err: errBaseMutated}}
if !reflect.DeepEqual(evErr, evErrOk) {
t.Fatalf("after base mutate: unexpected event:\nhave: %s\nwant: %s", evErr, evErrOk)
......@@ -109,6 +109,9 @@ type FileStorage struct {
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
errClose error // error from .file.Close()
watchWg sync.WaitGroup // to wait for watcher finish
closed chan struct{} // ready when storage was Closed
closeOnce sync.Once
// IStorageDriver
......@@ -468,7 +471,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
// XXX it can also be internal.poll.ErrFileClosing
e.Err.Error() == "use of closed file") {
select {
case <-fs.down:
case <-fs.closed:
err = nil
......@@ -485,7 +488,13 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
if fs.watchq != nil {
if err != nil {
fs.watchq <- &zodb.EventError{err}
select {
case <-fs.closed:
// closed - skip send to watchq
case fs.watchq <- &zodb.EventError{err}:
// ok
......@@ -535,9 +544,9 @@ mainloop:
if !first {
traceWatch("select ...")
select {
case <-fs.down:
case <-fs.closed:
// closed
return nil
case err := <-w.Errors:
......@@ -694,7 +703,7 @@ mainloop:
// notify client
if fs.watchq != nil {
select {
case <-fs.down:
case <-fs.closed:
return nil
case fs.watchq <- &zodb.EventCommit{it.Txnh.Tid, δoid}:
......@@ -772,6 +781,9 @@ func (fs *FileStorage) shutdown(reason error) {
func (fs *FileStorage) Close() error {
fs.closeOnce.Do(func() {
......@@ -794,6 +806,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
fs := &FileStorage{
watchq: opt.Watchq,
down: make(chan struct{}),
closed: make(chan struct{}),
f, err := os.Open(path)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment