Commit 99980ffa authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c5b00c4b
...@@ -1256,6 +1256,79 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1256,6 +1256,79 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// --- rebuilding index --- // --- rebuilding index ---
// updateIndex updates index from r's data in byte-range index.TopPos..topPos
//
// The use case is: we have index computed till some position; we open
// FileStorage and see there is more data; we update index from data range
// not-yet covered by the index.
//
// topPos=-1 means range to update from is index.TopPos..EOF
//
// XXX on error existing index is in invalid state?
func updateIndex(ctx context.Context, index *Index, r io.ReaderAt, topPos int64) (err error) {
// XXX err ctx
defer func() {
if err != nil {
index.Clear()
index.TopPos = txnValidFrom
}
}()
// XXX if topPos >= 0 && topPos < index.TopPos {
// error
// }
it := Iterate(r, index.TopPos, IterForward)
loop:
for {
// check ctx cancel once per transaction
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err = it.NextTxn(LoadNoStrings)
if err != nil {
err = okEOF(err)
break
}
// XXX check txnh.Status != TxnInprogress
// check for overlapping txn & whether we are done.
// topPos=-1 will never match here
if it.Txnh.Pos < topPos && (it.Txnh.Pos + it.Txnh.Len) >= topPos {
return fmt.Errorf("transaction %v @%v overlaps requested topPos %v",
it.Txnh.Tid, it.Txnh.Pos, topPos)
}
if it.Txnh.Pos == topPos {
return nil
}
index.TopPos = it.Txnh.Pos + it.Txnh.Len
for {
err = it.NextData()
if err != nil {
err = okEOF(err)
if err != nil {
break loop
}
break
}
index.Set(it.Datah.Oid, it.Datah.Pos)
}
}
if err != nil {
return err
}
return nil
}
// computeIndex builds new in-memory index for FileStorage // computeIndex builds new in-memory index for FileStorage
// XXX naming // XXX naming
// XXX in case of error return partially built index? (index has .TopPos until which it covers the data) // XXX in case of error return partially built index? (index has .TopPos until which it covers the data)
...@@ -1270,6 +1343,18 @@ func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err erro ...@@ -1270,6 +1343,18 @@ func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err erro
// 2. we can be sure we build the whole index if we saw all oids // 2. we can be sure we build the whole index if we saw all oids
fsSeq := xbufio.NewSeqReaderAt(fs.file) fsSeq := xbufio.NewSeqReaderAt(fs.file)
err = updateIndex(ctx, index, fsSeq, -1)
if err != nil {
return nil, err // XXX ok?
}
return index, nil
/*
it := Iterate(fsSeq, index.TopPos, IterForward) it := Iterate(fsSeq, index.TopPos, IterForward)
loop: loop:
...@@ -1309,10 +1394,50 @@ loop: ...@@ -1309,10 +1394,50 @@ loop:
return nil, err return nil, err
} }
return index, nil return index, nil
*/
} }
// checkIndexSane quickly checks index sanity.
// It scans few transactions starting from index.TopPos backwards and verifies
// whether oid there have correct entries in the index.
// XXX return: ? (how calling code should distinguish IO error on main file from consistency check error)
func checkIndexSane(index *Index, r io.ReaderAt) (err error) {
defer xerr.Contextf(&err, "index quick check") // XXX +main file
it := Iterate(r, index.TopPos, IterBackward)
for i := 0; i < 10; i++ {
err := it.NextTxn(LoadNoStrings)
if err != nil {
err = okEOF(err)
return err // XXX err ctx
}
for {
err = it.NextData()
if err != nil {
if err == io.EOF {
break
}
return err // XXX err ctx
}
dataPos, ok := index.Get(it.Datah.Oid)
if !ok {
return fmt.Errorf("oid %v @%v: no index entry", it.Datah.Oid, it.Datah.Pos)
}
if dataPos != it.Datah.Pos {
return fmt.Errorf("oid %v @%v: index has wrong pos (%v)", it.Datah.Oid, it.Datah.Pos, dataPos)
}
}
}
return nil
}
// loadIndex loads on-disk index to RAM // loadIndex loads on-disk index to RAM
func (fs *FileStorage) loadIndex() error { func (fs *FileStorage) loadIndex() (err error) {
// XXX lock? // XXX lock?
defer xerr.Contextf(&err, "%s: index load", fs.file.Name()) defer xerr.Contextf(&err, "%s: index load", fs.file.Name())
...@@ -1338,7 +1463,7 @@ func (fs *FileStorage) saveIndex() (err error) { ...@@ -1338,7 +1463,7 @@ func (fs *FileStorage) saveIndex() (err error) {
idxname := fs.file.Name() + ".index" idxname := fs.file.Name() + ".index"
idxtmp := idxname + ".index_tmp" idxtmp := idxname + ".index_tmp"
err := fs.index.SaveFile(idxtmp) err = fs.index.SaveFile(idxtmp)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -96,3 +96,4 @@ func reindexMain(argv []string) { ...@@ -96,3 +96,4 @@ func reindexMain(argv []string) {
} }
// TODO verify-index // TODO verify-index
// TODO verify-index -quick (only small sanity check)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment