Commit 57c7bd2e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1408fe57
...@@ -131,6 +131,7 @@ func errf(e xerr, subj, format string, a ...interface{}) error { ...@@ -131,6 +131,7 @@ func errf(e xerr, subj, format string, a ...interface{}) error {
} }
// decodeErr is syntactic shortcut for errf("decode", ...) // decodeErr is syntactic shortcut for errf("decode", ...)
// TODO in many places "decode" -> "selfcheck"
func decodeErr(e xerr, format string, a ...interface{}) error { func decodeErr(e xerr, format string, a ...interface{}) error {
return errf(e, "decode", format, a...) return errf(e, "decode", format, a...)
} }
...@@ -345,6 +346,17 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -345,6 +346,17 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
} }
// Len returns whole data record length
func (dh *DataHeader) Len() int64 {
dataLen := dh.DataLen
if dataLen == 0 {
// XXX -> .DataLen() ?
dataLen = 8 // back-pointer | oid removal
}
return DataHeaderSize + dataLen
}
// load reads and decodes data record header // load reads and decodes data record header
// pos: points to data header start // pos: points to data header start
...@@ -496,6 +508,52 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error { ...@@ -496,6 +508,52 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error {
return err return err
} }
// LoadNext reads and decodes data header for next data record in the same transaction
// prerequisite: dh .Pos .DataLen are initialized
// when there is no more data records: io.EOF is returned
//
// XXX NOTE(self): iteration starts with {Pos: txnh.Pos, DataLen: -DataHeaderSize}
func (dh *DataHeader) LoadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) error {
err := dh.loadNext(r, txnh)
if err != nil && err != io.EOF {
err = txnh.err("iterating", err)
}
return err
}
func (dh *DataHeader) loadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) error {
// position of txn tail - right after last data record byte
txnTailPos := txnh.Pos + txnh.Len - 8
// NOTE we know nextPos does not overlap txnTailPos - it was checked by
// previous LoadNext()
nextPos := dh.Pos + dh.Len()
if nextPos == txnTailPos {
return io.EOF
}
if nextPos + DataHeaderSize > txnTailPos {
return &ErrDataRecord{nextPos, "decode", fmt.Errorf("data record header overlaps txn boundary")} // XXX
}
err := dh.Load(r, nextPos)
if err != nil {
return err
}
if dh.Tid != txnh.Tid {
return decodeErr(dh, "tid != txn.Tid") // XXX
}
if dh.TxnPos != txnh.Pos {
return decodeErr(dh, "txnPos != txn.Pos") // XXX
}
if dh.Pos + dh.Len() > txnTailPos {
return decodeErr(dh, "data record overlaps txn boundary") // XXX
}
return nil
}
func Open(path string) (*FileStorage, error) { func Open(path string) (*FileStorage, error) {
f, err := os.Open(path) // XXX opens in O_RDONLY f, err := os.Open(path) // XXX opens in O_RDONLY
......
...@@ -105,8 +105,6 @@ func TestLoad(t *testing.T) { ...@@ -105,8 +105,6 @@ func TestLoad(t *testing.T) {
checkLoad(t, fs, xid, expect) checkLoad(t, fs, xid, expect)
} }
return
// check iterating XXX move to separate test ? // check iterating XXX move to separate test ?
// tids we will use for tid{Min,Max} // tids we will use for tid{Min,Max}
tidv := []zodb.Tid{zodb.Tid(0)} tidv := []zodb.Tid{zodb.Tid(0)}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment