Commit 7d1ae1a1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 731e6b32
...@@ -452,6 +452,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher) { ...@@ -452,6 +452,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher) {
defer w.Close() // XXX lclose defer w.Close() // XXX lclose
err := fs._watcher(w) err := fs._watcher(w)
// it is ok if we got read error due to file being closed // it is ok if we got read error due to file being closed
// XXX it can also be ErrFileClosing which is internal
if e, _ := errors.Cause(err).(*os.PathError); e != nil && e.Err == os.ErrClosed { if e, _ := errors.Cause(err).(*os.PathError); e != nil && e.Err == os.ErrClosed {
select { select {
case <-fs.down: case <-fs.down:
...@@ -646,9 +647,8 @@ func (fs *FileStorage) Close() error { ...@@ -646,9 +647,8 @@ func (fs *FileStorage) Close() error {
} }
// Open opens FileStorage @path. // Open opens FileStorage @path.
//
// TODO read-write support
func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileStorage, err error) { func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileStorage, err error) {
// TODO read-write support
if !opt.ReadOnly { if !opt.ReadOnly {
return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path) return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
} }
...@@ -693,6 +693,8 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto ...@@ -693,6 +693,8 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
log.Print(err) log.Print(err)
log.Printf("%s: index rebuild...", path) log.Printf("%s: index rebuild...", path)
index, err = BuildIndex(ctx, fseq, nil/*no progress; XXX log it? */) index, err = BuildIndex(ctx, fseq, nil/*no progress; XXX log it? */)
// XXX cause=ErrUnexpectedEOF -> let watcher decide what was
// it: garbage or in-progress transaction
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -367,6 +367,8 @@ func BenchmarkIterate(b *testing.B) { ...@@ -367,6 +367,8 @@ func BenchmarkIterate(b *testing.B) {
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
X := exc.Raiseif
//xtesting.NeedPy(t, "zodbtools") //xtesting.NeedPy(t, "zodbtools")
needZODBPy(t) needZODBPy(t)
workdir := xworkdir(t) workdir := xworkdir(t)
...@@ -398,10 +400,7 @@ func TestWatch(t *testing.T) { ...@@ -398,10 +400,7 @@ func TestWatch(t *testing.T) {
cmd:= exec.Command("python2", "-m", "zodbtools.zodb", "commit", tfs, at.String()) cmd:= exec.Command("python2", "-m", "zodbtools.zodb", "commit", tfs, at.String())
cmd.Stdin = zin cmd.Stdin = zin
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
out, err := cmd.Output() out, err := cmd.Output(); X(err)
if err != nil {
return zodb.InvalidTid, err
}
out = bytes.TrimSuffix(out, []byte("\n")) out = bytes.TrimSuffix(out, []byte("\n"))
tid, err := zodb.ParseTid(string(out)) tid, err := zodb.ParseTid(string(out))
...@@ -416,10 +415,7 @@ func TestWatch(t *testing.T) { ...@@ -416,10 +415,7 @@ func TestWatch(t *testing.T) {
//tracef("-> xcommit %s", at) //tracef("-> xcommit %s", at)
//defer tracef("<- xcommit") //defer tracef("<- xcommit")
t.Helper() t.Helper()
tid, err := zcommit(at, objv...) tid, err := zcommit(at, objv...); X(err)
if err != nil {
t.Fatal(err)
}
return tid return tid
} }
...@@ -432,10 +428,7 @@ func TestWatch(t *testing.T) { ...@@ -432,10 +428,7 @@ func TestWatch(t *testing.T) {
checkLastTid := func(lastOk zodb.Tid) { checkLastTid := func(lastOk zodb.Tid) {
t.Helper() t.Helper()
head, err := fs.LastTid(ctx) head, err := fs.LastTid(ctx); X(err)
if err != nil {
t.Fatalf("check last_tid: %s", err)
}
if head != lastOk { if head != lastOk {
t.Fatalf("check last_tid: got %s; want %s", head, lastOk) t.Fatalf("check last_tid: got %s; want %s", head, lastOk)
} }
...@@ -467,10 +460,7 @@ func TestWatch(t *testing.T) { ...@@ -467,10 +460,7 @@ func TestWatch(t *testing.T) {
checkLastTid(at) checkLastTid(at)
} }
err := fs.Close() err := fs.Close(); X(err)
if err != nil {
t.Fatal(err)
}
e, ok := <-watchq e, ok := <-watchq
if ok { if ok {
......
...@@ -453,7 +453,10 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro ...@@ -453,7 +453,10 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro
return err return err
} }
// XXX check txnh.Status != TxnInprogress // this transaction was only voted, not fully committed yet.
if it.Txnh.Status == zodb.TxnInprogress {
return nil
}
// check for topPos overlapping txn & whether we are done. // check for topPos overlapping txn & whether we are done.
// topPos=-1 will never match here // topPos=-1 will never match here
...@@ -693,7 +696,7 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr ...@@ -693,7 +696,7 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr
return nil, err return nil, err
} }
topPos := fi.Size() // XXX there might be last TxnInprogress transaction XXX topPos := fi.Size() // XXX there might be last TxnInprogress transaction XXX
if index.TopPos != topPos { if index.TopPos != topPos {
return nil, indexCorrupt(f, "topPos mismatch: data=%v index=%v", topPos, index.TopPos) return nil, indexCorrupt(f, "topPos mismatch: data=%v index=%v", topPos, index.TopPos)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment