Commit 71ba6f4e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 967830c5
...@@ -496,6 +496,7 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt /* *os.File */) error { ...@@ -496,6 +496,7 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt /* *os.File */) error {
// LoadBack reads and decodes data header for revision linked via back-pointer // LoadBack reads and decodes data header for revision linked via back-pointer
// prerequisite: dh XXX .DataLen == 0 // prerequisite: dh XXX .DataLen == 0
// if link is to zero (means deleted record) io.EOF is returned
func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error { func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error {
if dh.DataLen != 0 { if dh.DataLen != 0 {
bug(dh, "LoadBack() on non-backpointer data header") bug(dh, "LoadBack() on non-backpointer data header")
...@@ -508,13 +509,15 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error { ...@@ -508,13 +509,15 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error {
} }
backPos := int64(binary.BigEndian.Uint64(xxx[:])) backPos := int64(binary.BigEndian.Uint64(xxx[:]))
if backPos == 0 {
return io.EOF // oid was deleted
}
if backPos < dataValidFrom { if backPos < dataValidFrom {
return decodeErr(dh, "invalid backpointer: %v", backPos) return decodeErr(dh, "invalid backpointer: %v", backPos)
} }
if backPos + DataHeaderSize > dh.TxnPos - 8 { if backPos + DataHeaderSize > dh.TxnPos - 8 {
return decodeErr(dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos) return decodeErr(dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos)
} }
// TODO backPos can be also == 0 - (means deleted rev)
posCur := dh.Pos posCur := dh.Pos
tid := dh.Tid tid := dh.Tid
...@@ -545,8 +548,6 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error { ...@@ -545,8 +548,6 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error {
// LoadNext reads and decodes data header for next data record in the same transaction // LoadNext reads and decodes data header for next data record in the same transaction
// prerequisite: dh .Pos .DataLen are initialized // prerequisite: dh .Pos .DataLen are initialized
// when there is no more data records: io.EOF is returned // when there is no more data records: io.EOF is returned
//
// XXX NOTE(self): iteration starts with {Pos: txnh.Pos, DataLen: -DataHeaderSize}
func (dh *DataHeader) LoadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) error { func (dh *DataHeader) LoadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) error {
err := dh.loadNext(r, txnh) err := dh.loadNext(r, txnh)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
...@@ -588,6 +589,37 @@ func (dh *DataHeader) loadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) er ...@@ -588,6 +589,37 @@ func (dh *DataHeader) loadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) er
return nil return nil
} }
// LoadData loads data for the data record taking backpointers into account
// Data is loaded into *buf, which, if needed, is reallocated to hold all loading data size XXX
// NOTE on success dh state is changed to data header of original data transaction
// TODO buf -> slab
func (dh *DataHeader) LoadData(r io.ReaderAt /* *os.File */, buf *[]byte) error {
// scan via backpointers
for dh.DataLen == 0 {
err := dh.LoadBack(r)
if err != nil {
if err == io.EOF {
*buf = nil // deleted
return nil
}
return err // XXX recheck
}
}
// now read actual data
if int64(cap(*buf)) < dh.DataLen {
*buf = make([]byte, dh.DataLen)
} else {
*buf = (*buf)[:dh.DataLen]
}
_, err := r.ReadAt(*buf, dh.Pos + DataHeaderSize)
if err != nil {
return dh.err("read data", noEOF(err)) // XXX recheck
}
return nil
}
// Open opens FileStorage XXX text // Open opens FileStorage XXX text
func Open(path string) (*FileStorage, error) { func Open(path string) (*FileStorage, error) {
fs := &FileStorage{} fs := &FileStorage{}
...@@ -696,19 +728,10 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -696,19 +728,10 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// be of first-found transaction // be of first-found transaction
tid = dh.Tid tid = dh.Tid
// scan via backpointers // TODO data -> slab
for dh.DataLen == 0 { err = dh.LoadData(fs.file, &data)
err = dh.LoadBack(fs.file)
if err != nil {
panic(err)
}
}
// now read actual data
data = make([]byte, dh.DataLen) // TODO -> slab ?
_, err = fs.file.ReadAt(data, dh.Pos + DataHeaderSize)
if err != nil { if err != nil {
return nil, zodb.Tid(0), &ErrXidLoad{xid, noEOF(err)} return nil, zodb.Tid(0), &ErrXidLoad{xid, err}
} }
return data, tid, nil return data, tid, nil
...@@ -736,61 +759,111 @@ type txnIter struct { ...@@ -736,61 +759,111 @@ type txnIter struct {
Txnh TxnHeader // current transaction information Txnh TxnHeader // current transaction information
TidStop zodb.Tid // iterate up to tid <= tidStop | tid >= tidStop depending on .dir TidStop zodb.Tid // iterate up to tid <= tidStop | tid >= tidStop depending on .dir
Dir int // iterate forward (> 0) / backward (< 0) / EOF reached (== 0) Flags iterFlags // iterate forward (> 0) / backward (< 0) / EOF reached (== 0)
} }
func (fi *txnIter) NextTxn(flags TxnLoadFlags) error { type iterFlags int
var err error const (
iterDir iterFlags = 1 << iota // iterate forward (1) or backward (0)
iterEOF // EOF reached
iterPreloaded // data for this iteration was alrady preloaded
)
func (ti *txnIter) NextTxn(flags TxnLoadFlags) error {
switch { switch {
case fi.Dir & 0x10 != 0: case ti.Flags & iterEOF != 0:
// first element is already there - preloaded by who initialized txnIter return io.EOF
fi.Dir &= ^0x10
return nil
case fi.Dir > 0:
err = fi.Txnh.LoadNext(fi.fs.file, flags)
case fi.Dir < 0: case ti.Flags & iterPreloaded != 0:
err = fi.Txnh.LoadPrev(fi.fs.file, flags) // first element is already there - preloaded by who initialized txnIter
ti.Flags &= ^iterPreloaded
fmt.Println("preloaded:", ti.Txnh.Tid)
default: // fi.Dir == 0: default:
return io.EOF var err error
if ti.Flags & iterDir != 0 {
err = ti.Txnh.LoadNext(ti.fs.file, flags)
} else {
err = ti.Txnh.LoadPrev(ti.fs.file, flags)
} }
// XXX EOF ^^^ is not expected (range pre-cut to valid tids) ?
fmt.Println("loaded:", ti.Txnh.Tid)
if err != nil { if err != nil {
return err return err
} }
}
// XXX how to make sure last good txnh is preserved? // XXX how to make sure last good txnh is preserved?
if (fi.Dir > 0 && fi.Txnh.Tid > fi.TidStop) || if (ti.Flags&iterDir != 0 && ti.Txnh.Tid > ti.TidStop) ||
(fi.Dir < 0 && fi.Txnh.Tid < fi.TidStop) { (ti.Flags&iterDir == 0 && ti.Txnh.Tid < ti.TidStop) {
fi.Dir = 0 ti.Flags |= iterEOF
return io.EOF return io.EOF
} }
return nil return nil
} }
// dataIter is iterator over data records inside one transaction
type dataIter struct {
fs *FileStorage
type Iterator struct { Txnh *TxnHeader // header of transaction we are iterating inside
txnIter txnIter Datah DataHeader
sri zodb.StorageRecordInformation // ptr to this will be returned by NextData
dataBuf []byte
} }
func (fsi *Iterator) NextTxn(txnInfo *zodb.TxnInfo) (dataIter zodb.IStorageRecordIterator, err error) { func (di *dataIter) NextData() (*zodb.StorageRecordInformation, error) {
err = fsi.txnIter.NextTxn(LoadAll) err := di.Datah.LoadNext(di.fs.file, di.Txnh)
if err != nil { if err != nil {
return nil, err // XXX recheck return nil, err // XXX recheck
} }
*txnInfo = fsi.txnIter.Txnh.TxnInfo di.sri.Oid = di.Datah.Oid
di.sri.Tid = di.Datah.Tid
dh := di.Datah
di.sri.Data = di.dataBuf
err = dh.LoadData(di.fs.file, &di.sri.Data)
if err != nil {
return nil, err // XXX recheck
}
// if memory was reallocated - use it next time
if cap(di.sri.Data) > cap(di.dataBuf) {
di.dataBuf = di.sri.Data
}
di.sri.DataTid = dh.Tid
return &di.sri, nil
}
// Iterator is transaction/data-records iterator as specified by zodb.IStorage
type Iterator struct {
txnIter txnIter
dataIter dataIter
}
func (fsi *Iterator) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) {
err := fsi.txnIter.NextTxn(LoadAll)
if err != nil {
return nil, nil, err // XXX recheck
}
// TODO set dataIter // TODO set dataIter
return nil /*dataIter*/, nil //
// XXX NOTE(self): iteration starts with {Pos: txnh.Pos, DataLen: -DataHeaderSize}
return &fsi.txnIter.Txnh.TxnInfo, &fsi.dataIter, nil
} }
func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
fmt.Printf("\nIterate %v..%v\n", tidMin, tidMax)
// FIXME case when only 0 or 1 txn present // FIXME case when only 0 or 1 txn present
if tidMin < fs.txnhMin.Tid { if tidMin < fs.txnhMin.Tid {
tidMin = fs.txnhMin.Tid tidMin = fs.txnhMin.Tid
...@@ -802,15 +875,24 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -802,15 +875,24 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// -> XXX empty // -> XXX empty
} }
// XXX naming
Iter := Iterator{}
Iter.txnIter.fs = fs
Iter.dataIter.fs = fs
Iter.dataIter.Txnh = &Iter.txnIter.Txnh
// scan either from file start or end, depending which way it is likely closer, to tidMin // scan either from file start or end, depending which way it is likely closer, to tidMin
iter := txnIter{fs: fs} // XXX put iter into ptr to Iter ^^^
iter := &Iter.txnIter
if (tidMin - fs.txnhMin.Tid) < (fs.txnhMax.Tid - tidMin) { if (tidMin - fs.txnhMin.Tid) < (fs.txnhMax.Tid - tidMin) {
iter.Dir = +0x11 println("forward")
iter.Flags = 1*iterDir | iterPreloaded
iter.Txnh.CloneFrom(&fs.txnhMin) iter.Txnh.CloneFrom(&fs.txnhMin)
iter.TidStop = tidMin iter.TidStop = tidMin - 1 // XXX overflow
} else { } else {
iter.Dir = -0x11 println("backward")
iter.Flags = 0*iterDir | iterPreloaded
iter.Txnh.CloneFrom(&fs.txnhMax) iter.Txnh.CloneFrom(&fs.txnhMax)
iter.TidStop = tidMin iter.TidStop = tidMin
} }
...@@ -830,15 +912,20 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -830,15 +912,20 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
} }
fmt.Printf("tidRange: %v..%v -> found %v @%v\n", tidMin, tidMax, iter.Txnh.Tid, iter.Txnh.Pos) fmt.Printf("tidRange: %v..%v -> found %v @%v\n", tidMin, tidMax, iter.Txnh.Tid, iter.Txnh.Pos)
return nil
// // prepare to start iterating from found transaction // where to start around tidMin found - let's reinitialize iter to
// // XXX loadStrings() on first step ? // iterate appropriately forward up to tidMax
// iter.Txnh.Tid iter.Flags &= ^iterEOF
// if iter.Flags&iterDir != 0 {
// // txnh should have .Tid <= tidMin but next txn's .Tid is > tidMin // when ^^^ we were searching forward first txn was already found
// posStart := iter.txnPos err = iter.Txnh.loadStrings(fs.file) // XXX ok? XXX -> move NextTxn() ?
// if t if err != nil {
// panic(err) // XXX
// return &FileStorageIterator{-1, tidMin, tidMax} // XXX -1 ok ? }
iter.Flags |= iterPreloaded
}
iter.Flags |= iterDir
iter.TidStop = tidMax
return &Iter
} }
...@@ -129,11 +129,11 @@ func TestLoad(t *testing.T) { ...@@ -129,11 +129,11 @@ func TestLoad(t *testing.T) {
// expect error / panic or empty iteration ? // expect error / panic or empty iteration ?
} }
txni := zodb.TxnInfo{} //txni := zodb.TxnInfo{}
datai := zodb.StorageRecordInformation{} //datai := zodb.StorageRecordInformation{}
for k := 0; ; k++ { for k := 0; ; k++ {
dataIter, err := iter.NextTxn(&txni) txni, dataIter, err := iter.NextTxn()
if err != nil { if err != nil {
err = okEOF(err) err = okEOF(err)
break break
...@@ -149,11 +149,13 @@ func TestLoad(t *testing.T) { ...@@ -149,11 +149,13 @@ func TestLoad(t *testing.T) {
} }
for { for {
err = dataIter.NextData(&datai) datai, err := dataIter.NextData()
if err != nil { if err != nil {
err = okEOF(err) err = okEOF(err)
break break
} }
_ = datai
} }
// TODO check err // TODO check err
......
...@@ -131,9 +131,11 @@ type TxnInfo struct { ...@@ -131,9 +131,11 @@ type TxnInfo struct {
type StorageRecordInformation struct { type StorageRecordInformation struct {
Oid Oid Oid Oid
Tid Tid Tid Tid
Data []byte Data []byte // nil means: deleted
// XXX .version ?
// XXX .data_txn (The previous transaction id) // original tid data was committed (e.g. in case of undo)
// XXX we don't really need this
DataTid Tid
} }
...@@ -174,14 +176,14 @@ type IStorageIterator interface { ...@@ -174,14 +176,14 @@ type IStorageIterator interface {
// NextTxn yields information about next database transaction: // NextTxn yields information about next database transaction:
// 1. transaction metadata, and // 1. transaction metadata, and
// 2. iterator over transaction data records. // 2. iterator over transaction data records.
// transaction metadata is put into *txnInfo and stays valid until next call to NextTxn(). // transaction metadata stays valid until next call to NextTxn().
// end of iteration is indicated with io.EOF // end of iteration is indicated with io.EOF
NextTxn(txnInfo *TxnInfo) (dataIter IStorageRecordIterator, err error) NextTxn() (*TxnInfo, IStorageRecordIterator, error)
} }
type IStorageRecordIterator interface { // XXX naming -> IRecordIterator type IStorageRecordIterator interface { // XXX naming -> IRecordIterator
// NextData puts information about next storage data record into *dataInfo. // NextData yields information about next storage data record.
// data put into *dataInfo stays valid until next call to NextData(). // returned data stays valid until next call to NextData().
// end of iteration is indicated with io.EOF // end of iteration is indicated with io.EOF
NextData(dataInfo *StorageRecordInformation) error NextData() (*StorageRecordInformation, error)
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment