Commit 35eb77b2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4c621541
...@@ -87,6 +87,7 @@ type FileStorage struct { ...@@ -87,6 +87,7 @@ type FileStorage struct {
index *Index // oid -> data record position in transaction which last changed oid index *Index // oid -> data record position in transaction which last changed oid
// transaction headers for min/max transactions committed // transaction headers for min/max transactions committed
// (both with .Len=0 & .Tid=0 if database is empty)
// XXX keep loaded with LoadNoStrings ? // XXX keep loaded with LoadNoStrings ?
txnhMin TxnHeader txnhMin TxnHeader
txnhMax TxnHeader txnhMax TxnHeader
...@@ -107,13 +108,11 @@ func (fs *FileStorage) URL() string { ...@@ -107,13 +108,11 @@ func (fs *FileStorage) URL() string {
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) { func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) {
// XXX check we have transactions at all - what to return if not?
// XXX must be under lock // XXX must be under lock
return fs.txnhMax.Tid, nil return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty
} }
func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) { func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) {
// XXX check we have objects at all - what to return if not?
// XXX must be under lock // XXX must be under lock
// XXX what if an oid was deleted? // XXX what if an oid was deleted?
lastOid, _ := fs.index.Last() // returns zero-value, if empty lastOid, _ := fs.index.Last() // returns zero-value, if empty
...@@ -240,14 +239,11 @@ const ( ...@@ -240,14 +239,11 @@ const (
func (zi *zIter) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIterator, error) { func (zi *zIter) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIterator, error) {
switch { switch {
case zi.zFlags & zIterEOF != 0: case zi.zFlags & zIterEOF != 0:
//println("already eof")
return nil, nil, io.EOF return nil, nil, io.EOF
// XXX needed?
case zi.zFlags & zIterPreloaded != 0: case zi.zFlags & zIterPreloaded != 0:
// first element is already there - preloaded by who initialized TxnIter // first element is already there - preloaded by who initialized TxnIter
zi.zFlags &= ^zIterPreloaded zi.zFlags &= ^zIterPreloaded
//fmt.Println("preloaded:", zi.Txnh.Tid)
default: default:
err := zi.iter.NextTxn(LoadAll) err := zi.iter.NextTxn(LoadAll)
...@@ -257,10 +253,8 @@ func (zi *zIter) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIterator, ...@@ -257,10 +253,8 @@ func (zi *zIter) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIterator,
} }
} }
// XXX how to make sure last good txnh is preserved?
if (zi.iter.Dir == IterForward && zi.iter.Txnh.Tid > zi.tidStop) || if (zi.iter.Dir == IterForward && zi.iter.Txnh.Tid > zi.tidStop) ||
(zi.iter.Dir == IterBackward && zi.iter.Txnh.Tid < zi.tidStop) { (zi.iter.Dir == IterBackward && zi.iter.Txnh.Tid < zi.tidStop) {
//println("-> EOF")
zi.zFlags |= zIterEOF zi.zFlags |= zIterEOF
return nil, nil, io.EOF return nil, nil, io.EOF
} }
...@@ -313,29 +307,26 @@ func (e *iterStartError) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIt ...@@ -313,29 +307,26 @@ func (e *iterStartError) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIt
} }
// findTxnRecord finds smallest transaction record with txn.tid >= tid XXX or <= ? // findTxnRecord finds transaction record with min(txn.tid): txn.tid >= tid
// if there is no such transaction returned TxnHeader will be invalid (.Pos = 0) and error = nil //
// error != nil only on IO error // if there is no such transaction returned error will be EOF.
// XXX ^^^ text
func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, error) { func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, error) {
//fmt.Printf("findTxn %v\n", tid)
// XXX read snapshot under lock // XXX read snapshot under lock
// check for empty database
if fs.txnhMin.Len == 0 { if fs.txnhMin.Len == 0 {
// empty database - no such record // empty database - no such record
return TxnHeader{}, nil return TxnHeader{}, io.EOF
} }
// NOTE cloning to unalias strings memory // now we know the database is not empty and thus txnh min & max are valid
// clone them to unalias strings memory
var tmin, tmax TxnHeader var tmin, tmax TxnHeader
tmin.CloneFrom(&fs.txnhMin) tmin.CloneFrom(&fs.txnhMin)
tmax.CloneFrom(&fs.txnhMax) tmax.CloneFrom(&fs.txnhMax)
// now we know the database is not empty and thus tmin & tmax are valid
if tmax.Tid < tid { if tmax.Tid < tid {
return TxnHeader{}, nil // no such record return TxnHeader{}, io.EOF // no such record
} }
if tmin.Tid >= tid { if tmin.Tid >= tid {
return tmin, nil // tmin satisfies return tmin, nil // tmin satisfies
...@@ -344,7 +335,6 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er ...@@ -344,7 +335,6 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er
// now we know tid ∈ (tmin, tmax] // now we know tid ∈ (tmin, tmax]
// iterate and scan either from tmin or tmax, depending which way it is // iterate and scan either from tmin or tmax, depending which way it is
// likely closer, to searched tid. // likely closer, to searched tid.
// when iterating use IO optimized for sequential access
iter := &Iter{R: r} iter := &Iter{R: r}
if (tid - tmin.Tid) < (tmax.Tid - tid) { if (tid - tmin.Tid) < (tmax.Tid - tid) {
...@@ -393,25 +383,21 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er ...@@ -393,25 +383,21 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er
// Iterate creates zodb-level iterator for tidMin..tidMax range // Iterate creates zodb-level iterator for tidMin..tidMax range
func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator { func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
//fmt.Printf("iterate %v..%v\n", tidMin, tidMax)
// when iterating use IO optimized for sequential access // when iterating use IO optimized for sequential access
// XXX -> IterateRaw ?
fsSeq := xbufio.NewSeqReaderAt(fs.file) fsSeq := xbufio.NewSeqReaderAt(fs.file)
ziter := &zIter{iter: Iter{R: fsSeq}} ziter := &zIter{iter: Iter{R: fsSeq}}
iter := &ziter.iter iter := &ziter.iter
// find first txn : txn.tid >= tidMin // find first txn : txn.tid >= tidMin
txnh, err := fs.findTxnRecord(fsSeq, tidMin) txnh, err := fs.findTxnRecord(fsSeq, tidMin)
if err != nil { switch {
return &iterStartError{err} // XXX err ctx case err == io.EOF:
} ziter.zFlags |= zIterEOF // empty
if txnh.Pos == 0 { // XXX -> txnh.Valid() ?
ziter.zFlags |= zIterEOF // empty
return ziter return ziter
}
//fmt.Printf("tidRange: %v..%v -> found %v @%v\n", tidMin, tidMax, txnh.Tid, txnh.Pos) case err != nil:
return &iterStartError{err}
}
// setup iter from what findTxnRecord found // setup iter from what findTxnRecord found
iter.Txnh = txnh iter.Txnh = txnh
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment