Commit 88848c31 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/fs1: Fix it to provide "Close vs watchq" guaranty

Provide guaranty that Close forces the driver to stop sending to watchq
and to close it. See a5dbb92b ("go/zodb: Require drivers to close watchq
on Close") for details.
parent a5dbb92b
// Copyright (C) 2017-2020 Nexedi SA and Contributors. // Copyright (C) 2017-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -109,6 +109,9 @@ type FileStorage struct { ...@@ -109,6 +109,9 @@ type FileStorage struct {
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
errClose error // error from .file.Close() errClose error // error from .file.Close()
watchWg sync.WaitGroup // to wait for watcher finish watchWg sync.WaitGroup // to wait for watcher finish
closed chan struct{} // ready when storage was Closed
closeOnce sync.Once
} }
// IStorageDriver // IStorageDriver
...@@ -455,7 +458,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) { ...@@ -455,7 +458,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
// XXX it can also be internal.poll.ErrFileClosing // XXX it can also be internal.poll.ErrFileClosing
e.Err.Error() == "use of closed file") { e.Err.Error() == "use of closed file") {
select { select {
case <-fs.down: case <-fs.closed:
err = nil err = nil
default: default:
} }
...@@ -472,7 +475,13 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) { ...@@ -472,7 +475,13 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
if fs.watchq != nil { if fs.watchq != nil {
if err != nil { if err != nil {
fs.watchq <- &zodb.EventError{err} select {
case <-fs.closed:
// closed - skip send to watchq
case fs.watchq <- &zodb.EventError{err}:
// ok
}
} }
close(fs.watchq) close(fs.watchq)
} }
...@@ -522,9 +531,9 @@ mainloop: ...@@ -522,9 +531,9 @@ mainloop:
if !first { if !first {
traceWatch("select ...") traceWatch("select ...")
select { select {
case <-fs.down: case <-fs.closed:
// closed // closed
traceWatch("down") traceWatch("closed")
return nil return nil
case err := <-w.Errors: case err := <-w.Errors:
...@@ -680,7 +689,7 @@ mainloop: ...@@ -680,7 +689,7 @@ mainloop:
// notify client // notify client
if fs.watchq != nil { if fs.watchq != nil {
select { select {
case <-fs.down: case <-fs.closed:
return nil return nil
case fs.watchq <- &zodb.EventCommit{it.Txnh.Tid, δoid}: case fs.watchq <- &zodb.EventCommit{it.Txnh.Tid, δoid}:
...@@ -758,6 +767,9 @@ func (fs *FileStorage) shutdown(reason error) { ...@@ -758,6 +767,9 @@ func (fs *FileStorage) shutdown(reason error) {
} }
func (fs *FileStorage) Close() error { func (fs *FileStorage) Close() error {
fs.closeOnce.Do(func() {
close(fs.closed)
})
fs.shutdown(fmt.Errorf("closed")) fs.shutdown(fmt.Errorf("closed"))
fs.watchWg.Wait() fs.watchWg.Wait()
...@@ -780,6 +792,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto ...@@ -780,6 +792,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
fs := &FileStorage{ fs := &FileStorage{
watchq: opt.Watchq, watchq: opt.Watchq,
down: make(chan struct{}), down: make(chan struct{}),
closed: make(chan struct{}),
} }
f, err := os.Open(path) f, err := os.Open(path)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment