Commit 4c4ce2ea authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 15af0624
...@@ -98,103 +98,6 @@ func (fs *FileStorage) StorageName() string { ...@@ -98,103 +98,6 @@ func (fs *FileStorage) StorageName() string {
return "FileStorage v1" return "FileStorage v1"
} }
// open opens FileStorage without loading index
//
// TODO read-write support
func open(path string) (*FileStorage, error) {
fs := &FileStorage{}
f, err := os.Open(path)
if err != nil {
return nil, err
}
fs.file = f
// check file magic
fh := FileHeader{}
err = fh.Load(f)
if err != nil {
return nil, err
}
// determine topPos from file size
// if it is invalid (e.g. a transaction committed only half-way) we'll catch it
// while loading/recreating index XXX recheck this logic
fi, err := f.Stat()
if err != nil {
return nil, err
}
topPos := fi.Size()
// read tidMin/tidMax
// FIXME support empty file case -> then both txnhMin and txnhMax stays invalid
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll) // XXX txnValidFrom here -> ?
if err != nil {
return nil, err
}
err = fs.txnhMax.Load(f, topPos, LoadAll)
// expect EOF but .LenPrev must be good
// FIXME ^^^ it will be no EOF if a txn was committed only partially
if err != io.EOF {
if err == nil {
err = fmt.Errorf("%s: no EOF after topPos", f.Name())
}
return nil, fmt.Errorf("%s: %s", f.Name(), err)
}
if fs.txnhMax.LenPrev <= 0 {
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos)
}
err = fs.txnhMax.LoadPrev(f, LoadAll)
if err != nil {
panic(err) // XXX
}
return fs, nil
}
// Open opens FileStorage @path.
//
// TODO read-write support
func Open(ctx context.Context, path string) (*FileStorage, error) {
// open data file
fs, err := open(path)
if err != nil {
return nil, err
}
// load/rebuild index
err = fs.loadIndex()
if err != nil {
log.Print(err)
log.Printf("%s: index recompute...", path)
// XXX if !ro -> .reindex() which saves it
fs.index, err = fs.computeIndex(ctx)
if err != nil {
fs.file.Close() // XXX lclose
return nil, err
}
}
// TODO verify index is sane / topPos matches
if fs.index.TopPos != fs.txnhMax.Pos + fs.txnhMax.Len {
panic("TODO: inconsistent index topPos") // XXX
}
return fs, nil
}
func (fs *FileStorage) Close() error {
// TODO dump index (if !ro ?)
err := fs.file.Close()
if err != nil {
return err
}
fs.file = nil
return nil
}
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) { func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) {
// XXX check we have transactions at all - what to return if not? // XXX check we have transactions at all - what to return if not?
...@@ -220,7 +123,8 @@ func (e *ErrXidLoad) Error() string { ...@@ -220,7 +123,8 @@ func (e *ErrXidLoad) Error() string {
return fmt.Sprintf("loading %v: %v", e.Xid, e.Err) return fmt.Sprintf("loading %v: %v", e.Xid, e.Err)
} }
// freelist(DataHeader)
// freelist(DataHeader) XXX move -> format.go ?
var dhPool = sync.Pool{New: func() interface{} { return &DataHeader{} }} var dhPool = sync.Pool{New: func() interface{} { return &DataHeader{} }}
// DataHeaderAlloc allocates DataHeader from freelist. // DataHeaderAlloc allocates DataHeader from freelist.
...@@ -229,6 +133,8 @@ func DataHeaderAlloc() *DataHeader { ...@@ -229,6 +133,8 @@ func DataHeaderAlloc() *DataHeader {
} }
// Free puts dh back into DataHeader freelist. // Free puts dh back into DataHeader freelist.
//
// Caller must not use dh after call to Free.
func (dh *DataHeader) Free() { func (dh *DataHeader) Free() {
dhPool.Put(dh) dhPool.Put(dh)
} }
...@@ -512,7 +418,115 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator { ...@@ -512,7 +418,115 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
return ziter return ziter
} }
// --- rebuilding index --- // --- open + rebuild index --- TODO review completely
// open opens FileStorage without loading index
func open(path string) (_ *FileStorage, err error) {
fs := &FileStorage{}
f, err := os.Open(path)
if err != nil {
return nil, err
}
fs.file = f
defer func() {
if err != nil {
f.Close() // XXX -> lclose
}
}()
// check file magic
fh := FileHeader{}
err = fh.Load(f)
if err != nil {
return nil, err
}
// FIXME rework opening logic to support case when last txn was committed only partially
// determine topPos from file size
// if it is invalid (e.g. a transaction committed only half-way) we'll catch it
// while loading/recreating index XXX recheck this logic
fi, err := f.Stat()
if err != nil {
return nil, err
}
topPos := fi.Size()
// read tidMin/tidMax
// FIXME support empty file case -> then both txnhMin and txnhMax stays invalid
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll) // XXX txnValidFrom here -> ?
if err != nil {
return nil, err
}
err = fs.txnhMax.Load(f, topPos, LoadAll)
// expect EOF but .LenPrev must be good
// FIXME ^^^ it will be no EOF if a txn was committed only partially
if err != io.EOF {
if err == nil {
err = fmt.Errorf("%s: no EOF after topPos", f.Name())
}
return nil, fmt.Errorf("%s: %s", f.Name(), err)
}
if fs.txnhMax.LenPrev <= 0 {
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos)
}
err = fs.txnhMax.LoadPrev(f, LoadAll)
if err != nil {
return nil, err
}
return fs, nil
}
// Open opens FileStorage @path.
//
// TODO read-write support
func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
// open data file
fs, err := open(path)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
fs.file.Close() // XXX lclose
}
}()
// load/rebuild index
err = fs.loadIndex()
if err != nil {
log.Print(err)
log.Printf("%s: index recompute...", path)
// XXX if !ro -> .reindex() which saves it
fs.index, err = fs.computeIndex(ctx)
if err != nil {
return nil, err
}
}
// TODO verify index is sane / topPos matches
// XXX zodb/py iirc scans 10 transactions back and verifies index against it
// XXX also if index is not good - it has to be just rebuild without open error
if fs.index.TopPos != fs.txnhMax.Pos + fs.txnhMax.Len {
return nil, fmt.Errorf("%s: inconsistent index topPos (TODO rebuild index)", path)
}
return fs, nil
}
func (fs *FileStorage) Close() error {
// TODO dump index if !ro
err := fs.file.Close()
if err != nil {
return err
}
fs.file = nil
return nil
}
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) { func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) {
// XXX lock? // XXX lock?
......
...@@ -33,7 +33,7 @@ import ( ...@@ -33,7 +33,7 @@ import (
"lab.nexedi.com/kirr/go123/xbytes" "lab.nexedi.com/kirr/go123/xbytes"
) )
// FileHeader represents file header // FileHeader represents header of whole data file
type FileHeader struct { type FileHeader struct {
Magic [4]byte Magic [4]byte
} }
...@@ -51,7 +51,7 @@ type TxnHeader struct { ...@@ -51,7 +51,7 @@ type TxnHeader struct {
// underlying memory for header loading and for user/desc/extension strings // underlying memory for header loading and for user/desc/extension strings
// invariant: after successful TxnHeader load len(.workMem) = lenUser + lenDesc + lenExt // invariant: after successful TxnHeader load len(.workMem) = lenUser + lenDesc + lenExt
// as specified by on-disk header // as specified by on-disk header.
workMem []byte workMem []byte
} }
...@@ -60,16 +60,12 @@ type DataHeader struct { ...@@ -60,16 +60,12 @@ type DataHeader struct {
Pos int64 // position of data record start Pos int64 // position of data record start
Oid zodb.Oid Oid zodb.Oid
Tid zodb.Tid Tid zodb.Tid
// XXX -> .PosPrevRev .PosTxn .LenData // XXX -> .PosPrevRev .PosTxn .LenData?
PrevRevPos int64 // position of this oid's previous-revision data record XXX naming PrevRevPos int64 // position of this oid's previous-revision data record
TxnPos int64 // position of transaction record this data record belongs to TxnPos int64 // position of transaction record this data record belongs to
//_ uint16 // 2-bytes with zero values. (Was version length.) //_ uint16 // 2-bytes with zero values. (Was version length.)
DataLen int64 // length of following data. if 0 -> following = 8 bytes backpointer DataLen int64 // length of following data. if 0 -> following = 8 bytes backpointer
// if backpointer == 0 -> oid deleted // if backpointer == 0 -> oid deleted
//Data []byte
//DataRecPos uint64 // if Data == nil -> byte position of data record containing data
// XXX include word0 ?
// underlying memory for header loading (to avoid allocations) // underlying memory for header loading (to avoid allocations)
workMem [DataHeaderSize]byte workMem [DataHeaderSize]byte
...@@ -84,8 +80,8 @@ const ( ...@@ -84,8 +80,8 @@ const (
txnXHeaderFixSize = 8 + TxnHeaderFixSize // ^^^ with trail LenPrev from previous record txnXHeaderFixSize = 8 + TxnHeaderFixSize // ^^^ with trail LenPrev from previous record
DataHeaderSize = 8+8+8+8+2+8 DataHeaderSize = 8+8+8+8+2+8
// txn/data pos that are < vvv are for sure invalid // txn/data pos that if < vvv are for sure invalid
txnValidFrom = int64(len(Magic)) // XXX = FileHeaderSize txnValidFrom = FileHeaderSize
dataValidFrom = txnValidFrom + TxnHeaderFixSize dataValidFrom = txnValidFrom + TxnHeaderFixSize
// invalid length that indicates start of iteration for TxnHeader LoadNext/LoadPrev // invalid length that indicates start of iteration for TxnHeader LoadNext/LoadPrev
...@@ -123,14 +119,15 @@ func (e *DataError) Error() string { ...@@ -123,14 +119,15 @@ func (e *DataError) Error() string {
} }
// err creates DataError for data record located at dh.Pos // err creates DataError for data record located at dh.Pos
//
// XXX add link to containing txn? (check whether we can do it on data access) ? // XXX add link to containing txn? (check whether we can do it on data access) ?
func (dh *DataHeader) err(subj string, err error) error { func (dh *DataHeader) err(subj string, err error) error {
return &DataError{dh.Pos, subj, err} return &DataError{dh.Pos, subj, err}
} }
// ierr is an interface for something which can create errors // ierr is an interface for something which can create errors.
// it is used by TxnHeader and DataHeader to create appropriate errors with their context // it is used by TxnHeader and DataHeader to create appropriate errors with their context.
type ierr interface { type ierr interface {
err(subj string, err error) error err(subj string, err error) error
} }
...@@ -162,7 +159,7 @@ func (fh *FileHeader) Load(r io.ReaderAt) error { ...@@ -162,7 +159,7 @@ func (fh *FileHeader) Load(r io.ReaderAt) error {
return err return err
} }
if string(fh.Magic[:]) != Magic { if string(fh.Magic[:]) != Magic {
return fmt.Errorf("%s: invalid magic %q", xio.Name(r), fh.Magic) return fmt.Errorf("%s: invalid fs1 magic %q", xio.Name(r), fh.Magic)
} }
return nil return nil
...@@ -557,9 +554,7 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt) error { ...@@ -557,9 +554,7 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt) error {
// LoadBackRef reads data for the data record and decodes it as backpointer reference. // LoadBackRef reads data for the data record and decodes it as backpointer reference.
// //
// prerequisite: dh loaded and .LenData == 0 (data record with back-pointer) // prerequisite: dh loaded and .LenData == 0 (data record with back-pointer).
// XXX return backPos=-1 if err?
// XXX unused?
func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) { func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) {
if dh.DataLen != 0 { if dh.DataLen != 0 {
bug(dh, "LoadBack() on non-backpointer data header") bug(dh, "LoadBack() on non-backpointer data header")
......
...@@ -41,7 +41,7 @@ func Reindex(ctx context.Context, path string, progress func(*fs1.IndexUpdatePro ...@@ -41,7 +41,7 @@ func Reindex(ctx context.Context, path string, progress func(*fs1.IndexUpdatePro
err = index.SaveFile(path + ".index") // XXX show progress during SaveFile? err = index.SaveFile(path + ".index") // XXX show progress during SaveFile?
if err != nil { if err != nil {
return err // XXX err ctx return err
} }
return nil return nil
...@@ -139,7 +139,7 @@ func VerifyIndexFor(ctx context.Context, path string, ntxn int, progress func(*f ...@@ -139,7 +139,7 @@ func VerifyIndexFor(ctx context.Context, path string, ntxn int, progress func(*f
// XXX lock path.lock ? // XXX lock path.lock ?
index, err := fs1.LoadIndexFile(path + ".index") index, err := fs1.LoadIndexFile(path + ".index")
if err != nil { if err != nil {
return err // XXX err ctx return err
} }
_, err = index.VerifyForFile(context.Background(), path, ntxn, progress) _, err = index.VerifyForFile(context.Background(), path, ntxn, progress)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment