Commit bae83b63 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ffde05a5
...@@ -496,7 +496,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -496,7 +496,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) { func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) {
fsSeq := xbufio.NewSeqReaderAt(fs.file) fsSeq := xbufio.NewSeqReaderAt(fs.file)
return BuildIndex(ctx, fsSeq) return BuildIndex(ctx, fsSeq, nil/*XXX no progress*/)
} }
// loadIndex loads on-disk index to RAM // loadIndex loads on-disk index to RAM
......
...@@ -31,14 +31,14 @@ import ( ...@@ -31,14 +31,14 @@ import (
) )
// Reindex rebuilds index for FileStorage file @ path // Reindex rebuilds index for FileStorage file @ path
func Reindex(ctx context.Context, path string) error { func Reindex(ctx context.Context, path string, progress func(*fs1.IndexUpdateProgress)) error {
// XXX lock path.lock ? // XXX lock path.lock ?
index, err := fs1.BuildIndexForFile(ctx, path) index, err := fs1.BuildIndexForFile(ctx, path, progress)
if err != nil { if err != nil {
return err return err
} }
err = index.SaveFile(path + ".index") err = index.SaveFile(path + ".index") // XXX show progress during SaveFile?
if err != nil { if err != nil {
return err // XXX err ctx return err // XXX err ctx
} }
...@@ -57,13 +57,16 @@ Rebuild FileStorage index ...@@ -57,13 +57,16 @@ Rebuild FileStorage index
options: options:
-quiet do not show intermediate progress.
-h --help this help text. -h --help this help text.
`) `)
} }
func reindexMain(argv []string) { func reindexMain(argv []string) {
quiet := false
flags := flag.FlagSet{Usage: func() { reindexUsage(os.Stderr) }} flags := flag.FlagSet{Usage: func() { reindexUsage(os.Stderr) }}
flags.Init("", flag.ExitOnError) flags.Init("", flag.ExitOnError)
flags.BoolVar(&quiet, "quiet", quiet, "do not show intermediate progress")
flags.Parse(argv[1:]) flags.Parse(argv[1:])
argv = flags.Args() argv = flags.Args()
...@@ -73,10 +76,36 @@ func reindexMain(argv []string) { ...@@ -73,10 +76,36 @@ func reindexMain(argv []string) {
} }
storPath := argv[0] storPath := argv[0]
err := Reindex(context.Background(), storPath) fi, err := os.Stat(storPath)
if err != nil {
zt.Fatal(err)
}
// progress display
progress := func(p *fs1.IndexUpdateProgress) {
topPos := p.TopPos
if topPos == -1 {
topPos = fi.Size()
}
fmt.Printf("Indexed data bytes: %.1f%% (%d/%d); #txn: %d, #oid: %d\n",
100 * float64(p.Index.TopPos) / float64(topPos),
p.Index.TopPos, topPos,
p.TxnIndexed, p.Index.Len())
}
if quiet {
progress = nil
}
err = Reindex(context.Background(), storPath, progress)
if err != nil { if err != nil {
zt.Fatal(err) zt.Fatal(err)
} }
if !quiet {
fmt.Println("done")
}
} }
// ---------------------------------------- // ----------------------------------------
...@@ -133,17 +162,16 @@ func verifyIdxMain(argv []string) { ...@@ -133,17 +162,16 @@ func verifyIdxMain(argv []string) {
bytesChecked := p.Index.TopPos - p.Iter.Txnh.Pos bytesChecked := p.Index.TopPos - p.Iter.Txnh.Pos
bytesAll := p.Index.TopPos bytesAll := p.Index.TopPos
fmt.Printf("Checked data bytes: %.1f%% (%d/%d); #txn: %d, #oid: %d\n", fmt.Printf("Checked data bytes: %.1f%% (%d/%d); #txn: %d, #oid: %d\n",
100 * float64(bytesChecked) / float64(bytesAll), // XXX /0 ? 100 * float64(bytesChecked) / float64(bytesAll),
bytesChecked, bytesAll, bytesChecked, bytesAll,
p.TxnChecked, len(p.OidChecked)) p.TxnChecked, len(p.OidChecked))
} else { } else {
fmt.Printf("Checked data transactions: %.1f%% (%d/%d); #oid: %d\n", fmt.Printf("Checked data transactions: %.1f%% (%d/%d); #oid: %d\n",
100 * float64(p.TxnChecked) / float64(p.TxnTotal), // XXX /0 ? 100 * float64(p.TxnChecked) / float64(p.TxnTotal),
p.TxnChecked, p.TxnTotal, len(p.OidChecked)) p.TxnChecked, p.TxnTotal, len(p.OidChecked))
} }
} }
if quiet { if quiet {
progress = nil progress = nil
} }
......
...@@ -413,6 +413,14 @@ func treeEqual(a, b *fsb.Tree) bool { ...@@ -413,6 +413,14 @@ func treeEqual(a, b *fsb.Tree) bool {
// --- build index from FileStorage data --- // --- build index from FileStorage data ---
// IndexUpdateProgress is data sent by Index.Update to notify about progress
type IndexUpdateProgress struct {
TopPos int64 // data range to update to; if = -1 -- till EOF
TxnIndexed int // # transactions read/indexed so far
Index *Index // index built so far
Iter *Iter // iterator through data XXX needed?
}
// Update updates in-memory index from r's FileStorage data in byte-range index.TopPos..topPos // Update updates in-memory index from r's FileStorage data in byte-range index.TopPos..topPos
// //
// The use case is: we have index computed till some position; we open // The use case is: we have index computed till some position; we open
...@@ -429,7 +437,7 @@ func treeEqual(a, b *fsb.Tree) bool { ...@@ -429,7 +437,7 @@ func treeEqual(a, b *fsb.Tree) bool {
// On success returned error is nil and index.TopPos is set to either: // On success returned error is nil and index.TopPos is set to either:
// - topPos (if it is != -1), or // - topPos (if it is != -1), or
// - r's position at which read got EOF (if topPos=-1). // - r's position at which read got EOF (if topPos=-1).
func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (err error) { func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, progress func (*IndexUpdateProgress)) (err error) {
defer xerr.Contextf(&err, "%s: reindex %v..%v", xio.Name(r), index.TopPos, topPos) defer xerr.Contextf(&err, "%s: reindex %v..%v", xio.Name(r), index.TopPos, topPos)
if topPos >= 0 && index.TopPos > topPos { if topPos >= 0 && index.TopPos > topPos {
...@@ -441,6 +449,13 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er ...@@ -441,6 +449,13 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er
// 2. we can be sure we build the whole index if we saw all oids // 2. we can be sure we build the whole index if we saw all oids
it := Iterate(r, index.TopPos, IterForward) it := Iterate(r, index.TopPos, IterForward)
pd := &IndexUpdateProgress{
TopPos: topPos,
Index: index,
Iter: it,
}
for { for {
// check ctx cancel once per transaction // check ctx cancel once per transaction
select { select {
...@@ -497,6 +512,12 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er ...@@ -497,6 +512,12 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er
for oid, pos := range update { for oid, pos := range update {
index.Set(oid, pos) index.Set(oid, pos)
} }
// notify progress
if progress != nil {
pd.TxnIndexed++
progress(pd)
}
} }
return nil return nil
...@@ -510,16 +531,16 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er ...@@ -510,16 +531,16 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er
// //
// In such cases the index building could be retried to be finished with // In such cases the index building could be retried to be finished with
// index.Update(). // index.Update().
func BuildIndex(ctx context.Context, r io.ReaderAt) (*Index, error) { func BuildIndex(ctx context.Context, r io.ReaderAt, progress func(*IndexUpdateProgress)) (*Index, error) {
index := IndexNew() index := IndexNew()
err := index.Update(ctx, r, -1) err := index.Update(ctx, r, -1, progress)
return index, err return index, err
} }
// BuildIndexForFile builds new in-memory index for data in file @ path // BuildIndexForFile builds new in-memory index for data in file @ path
// //
// See BuildIndex for semantic description. // See BuildIndex for semantic description.
func BuildIndexForFile(ctx context.Context, path string) (index *Index, err error) { func BuildIndexForFile(ctx context.Context, path string, progress func(*IndexUpdateProgress)) (index *Index, err error) {
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
return IndexNew(), err // XXX add err ctx? return IndexNew(), err // XXX add err ctx?
...@@ -533,7 +554,7 @@ func BuildIndexForFile(ctx context.Context, path string) (index *Index, err erro ...@@ -533,7 +554,7 @@ func BuildIndexForFile(ctx context.Context, path string) (index *Index, err erro
// use IO optimized for sequential access when building index // use IO optimized for sequential access when building index
fSeq := xbufio.NewSeqReaderAt(f) fSeq := xbufio.NewSeqReaderAt(f)
return BuildIndex(ctx, fSeq) return BuildIndex(ctx, fSeq, progress)
} }
// --- verify index against data in FileStorage --- // --- verify index against data in FileStorage ---
...@@ -553,12 +574,12 @@ func indexCorrupt(r io.ReaderAt, format string, argv ...interface{}) *IndexCorru ...@@ -553,12 +574,12 @@ func indexCorrupt(r io.ReaderAt, format string, argv ...interface{}) *IndexCorru
return &IndexCorruptError{DataFileName: xio.Name(r), Detail: fmt.Sprintf(format, argv...)} return &IndexCorruptError{DataFileName: xio.Name(r), Detail: fmt.Sprintf(format, argv...)}
} }
// IndexVerifyProgress is data sent by Index.Verify to progress // IndexVerifyProgress is data sent by Index.Verify to notify about progress
type IndexVerifyProgress struct { type IndexVerifyProgress struct {
TxnTotal int // total # of transactions to verify; if = -1 -- whole data TxnTotal int // total # of transactions to verify; if = -1 -- whole data
TxnChecked int TxnChecked int
Index *Index // index verification runs for Index *Index // index verification runs for
Iter *Iter // iterator thtough data Iter *Iter // iterator through data
OidChecked map[zodb.Oid]struct{} // oid checked so far OidChecked map[zodb.Oid]struct{} // oid checked so far
} }
...@@ -643,8 +664,9 @@ func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progres ...@@ -643,8 +664,9 @@ func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progres
} }
} }
// notify progress
if progress != nil { if progress != nil {
pd.TxnChecked = i pd.TxnChecked++
progress(pd) progress(pd)
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment