Commit 03cfbf7e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1f98a716
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
// SeqBufReader implements buffering for a io.ReaderAt optimized for sequential access // SeqBufReader implements buffering for a io.ReaderAt optimized for sequential access
// Both forward, backward and interleaved forward/backward access patterns are supported XXX
// FIXME access from multiple goroutines? (it is required per io.ReaderAt // FIXME access from multiple goroutines? (it is required per io.ReaderAt
// interface, but for sequential workloads we do not need it) // interface, but for sequential workloads we do not need it)
// XXX -> xbufio.SeqReader // XXX -> xbufio.SeqReader
...@@ -41,8 +42,13 @@ type SeqBufReader struct { ...@@ -41,8 +42,13 @@ type SeqBufReader struct {
buf []byte buf []byte
pos int64 pos int64
// position of last IO (can be != .pos because large reads are not buffered) // // position of last IO (can be != .pos because large reads are not buffered)
posLastIO int64 // posLastIO int64
// TODO text
posLastAccess int64
posLastFwdAfter int64
posLastBackward int64
r io.ReaderAt r io.ReaderAt
} }
...@@ -55,7 +61,7 @@ func NewSeqBufReader(r io.ReaderAt) *SeqBufReader { ...@@ -55,7 +61,7 @@ func NewSeqBufReader(r io.ReaderAt) *SeqBufReader {
} }
func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader { func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader {
sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size), posLastIO: 0} sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size)} //, posLastIO: 0}
return sb return sb
} }
...@@ -66,13 +72,25 @@ func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader { ...@@ -66,13 +72,25 @@ func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader {
// } // }
func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// read-in last access positions and update them in *sb with current ones for next read
posLastAccess := sb.posLastAccess
posLastFwdAfter := sb.posLastFwdAfter
posLastBackward := sb.posLastBackward
sb.posLastAccess = pos
if pos >= posLastAccess {
sb.posLastFwdAfter = pos + len64(p)
} else {
sb.posLastBackward = pos
}
// if request size > buffer - read data directly // if request size > buffer - read data directly
if len(p) > cap(sb.buf) { if len(p) > cap(sb.buf) {
// no copying from sb.buf here at all as if e.g. we could copy from sb.buf, the // no copying from sb.buf here at all as if e.g. we could copy from sb.buf, the
// kernel can copy the same data from pagecache as well, and it will take the same time // kernel can copy the same data from pagecache as well, and it will take the same time
// because for data in sb.buf corresponding page in pagecache has high p. to be hot. // because for data in sb.buf corresponding page in pagecache has high p. to be hot.
//log.Printf("READ [%v, %v)\t#%v", pos, pos + len64(p), len(p)) //log.Printf("READ [%v, %v)\t#%v", pos, pos + len64(p), len(p))
sb.posLastIO = pos //sb.posLastIO = pos
// TODO update lastAccess & lastFwd/lastBack
return sb.r.ReadAt(p, pos) return sb.r.ReadAt(p, pos)
} }
...@@ -122,13 +140,42 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { ...@@ -122,13 +140,42 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// NOTE len(p) <= cap(sb.buf) // NOTE len(p) <= cap(sb.buf)
var xpos int64 // position for new IO request var xpos int64 // position for new IO request
if pos >= sb.posLastIO { //if pos >= sb.posLastIO {
if pos >= posLastAccess {
// forward // forward
xpos = pos xpos = pos
// if forward trend continues and buffering can be made adjacent to
// previous forward access - shift reading down right to after it.
xLastAfter := posLastFwdAfter + int64(nhead) // XXX comment
if xLastAfter <= xpos && xpos + len64(p) <= xLastAfter + cap64(sb.buf) {
xpos = xLastAfter
}
// XXX symmetry for "alternatively" in backward case
} else { } else {
// backward // backward
xpos = pos
// if backward trend continues and bufferring would overlap with
// previous backward access - shift reading up right to it.
if xpos < posLastBackward && posLastBackward < xpos + cap64(sb.buf) {
xpos = max64(posLastBackward, xpos + len64(p)) - cap64(sb.buf)
// XXX recheck do we really need this ? ( was added for {122, 6, 121, 10} )
// XXX alternatively even if backward trend does not continue anymore
// but if this will overlap with last access (XXX load) range, probably
// it is better (we are optimizing for sequential access) to
// shift loading region down not to overlap.
} else if xpos + cap64(sb.buf) > posLastAccess {
xpos = max64(posLastAccess, xpos + len64(p)) - cap64(sb.buf)
}
// don't let reading go beyond start of the file
xpos = max64(xpos, 0)
/*
// by default we want to read forward, even when iterating backward: // by default we want to read forward, even when iterating backward:
// there are frequent jumps backward for reading a record there forward // there are frequent jumps backward for reading a record there forward
xpos = pos xpos = pos
...@@ -141,14 +188,15 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { ...@@ -141,14 +188,15 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// can overlap, if e.g. last access was big non-buffered read. // can overlap, if e.g. last access was big non-buffered read.
if xpos + cap64(sb.buf) > sb.posLastIO { if xpos + cap64(sb.buf) > sb.posLastIO {
xpos = max64(sb.posLastIO, xpos + len64(p)) - cap64(sb.buf) xpos = max64(sb.posLastIO, xpos + len64(p)) - cap64(sb.buf)
}
// don't let reading go beyond start of the file // don't let reading go beyond start of the file
xpos = max64(xpos, 0) xpos = max64(xpos, 0)
}
*/
} }
//log.Printf("read [%v, %v)\t#%v", xpos, xpos + cap64(sb.buf), cap(sb.buf)) //log.Printf("read [%v, %v)\t#%v", xpos, xpos + cap64(sb.buf), cap(sb.buf))
sb.posLastIO = xpos // sb.posLastIO = xpos
nn, err := sb.r.ReadAt(sb.buf[:cap(sb.buf)], xpos) nn, err := sb.r.ReadAt(sb.buf[:cap(sb.buf)], xpos)
// even if there was an error, or data partly read, we cannot retain // even if there was an error, or data partly read, we cannot retain
...@@ -169,9 +217,9 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { ...@@ -169,9 +217,9 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// - it was backward reading, and // - it was backward reading, and
// - original requst was narrower than buffer // - original requst was narrower than buffer
// try to satisfy it once again directly // try to satisfy it once again directly
if pos != xpos { if pos != xpos { // FIXME pos != xpos no longer means backward
//log.Printf("read [%v, %v)\t#%v", pos, pos + len64(p), len(p)) //log.Printf("read [%v, %v)\t#%v", pos, pos + len64(p), len(p))
sb.posLastIO = pos // sb.posLastIO = pos
nn, err = sb.r.ReadAt(p, pos) nn, err = sb.r.ReadAt(p, pos)
if nn < len(p) { if nn < len(p) {
return nn, err return nn, err
......
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"errors" "errors"
"io" "io"
"testing" "testing"
"fmt"
) )
...@@ -41,14 +43,15 @@ func (r *XReader) ReadAt(p []byte, pos int64) (n int, err error) { ...@@ -41,14 +43,15 @@ func (r *XReader) ReadAt(p []byte, pos int64) (n int, err error) {
// read @pos/len -> rb.pos, len(rb.buf) // read @pos/len -> rb.pos, len(rb.buf)
var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} { var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
// TODO add trend / not trend everywhere
{40, 5, 40, 10}, // 1st access, forward by default {40, 5, 40, 10}, // 1st access, forward by default
{45, 7, 50, 10}, // part taken from buf, part read next, forward {45, 7, 50, 10}, // part taken from buf, part read next, forward (trend)
{52, 5, 50, 10}, // everything taken from buf {52, 5, 50, 10}, // everything taken from buf
{57, 5, 60, 10}, // part taken from buf, part read next {57, 5, 60, 10}, // part taken from buf, part read next (trend)
{60, 11, 60, 10}, // access > cap(buf), buf skipped {60, 11, 60, 10}, // access > cap(buf), buf skipped
{71, 11, 60, 10}, // access > cap(buf), once again {71, 11, 60, 10}, // access > cap(buf), once again
{82, 10, 82, 10}, // access = cap(buf), should refill buf {82, 10, 82, 10}, // access = cap(buf), should refill buf
{92, 5, 92, 8}, // next access - should refill buffer, but only up to EIO range {92, 5, 92, 8}, // next access - should refill buffer (trend), but only up to EIO range
{97, 4, 100, 0}, // this triggers user-visible EIO, buffer scratched {97, 4, 100, 0}, // this triggers user-visible EIO, buffer scratched
{101, 5, 101, 0}, // EIO again {101, 5, 101, 0}, // EIO again
{105, 5, 105, 10}, // past EIO range - buffer refilled {105, 5, 105, 10}, // past EIO range - buffer refilled
...@@ -57,7 +60,7 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} { ...@@ -57,7 +60,7 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
{110,70, 98, 2}, // very big access forward, buf untouched {110,70, 98, 2}, // very big access forward, buf untouched
{180,70, 98, 2}, // big access ~ forward {180,70, 98, 2}, // big access ~ forward
{172, 5, 170, 10}, // backward: buffer refilled up to posLastIO {170, 5, 170, 10}, // backward: buffer refilled forward because prev backward reading was below
{168, 4, 160, 10}, // backward: buffer refilled backward {168, 4, 160, 10}, // backward: buffer refilled backward
{162, 6, 160, 10}, // backward: all data read from buffer {162, 6, 160, 10}, // backward: all data read from buffer
{150,12, 160, 10}, // big backward: buf untouched {150,12, 160, 10}, // big backward: buf untouched
...@@ -68,28 +71,48 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} { ...@@ -68,28 +71,48 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
{122, 6, 121, 10}, // backward after forward: buf refilled backward {122, 6, 121, 10}, // backward after forward: buf refilled backward
{131, 9, 131, 10}, // forward again {131, 9, 131, 10}, // forward again
{136,20, 131, 10}, // big forward starting from inside filled buf {136,20, 131, 10}, // big forward starting from inside filled buf
{128, 4, 126, 10}, // backward: buf refilled up to posLastIO {128, 4, 126, 10}, // backward (not trend): buf refilled up to posLastIO
// TODO interleaved forward + back-back-back
// TODO interleaved backward + fwd-fwd-fwd
{5, 4, 5, 10}, // forward near file start {5, 4, 5, 10}, // forward near file start
{2, 3, 0, 10}, // backward: buf does not go beyong 0 {2, 3, 0, 10}, // backward: buf does not go beyong 0
{40, 0, 0, 10}, // zero-sized out-of-buffer read do not change buffer {40, 0, 0, 10}, // zero-sized out-of-buffer read do not change buffer
// backward vs EIO // backward (not trend) vs EIO
{110, 1, 110, 10}, // reset state: forward @110 {108,10, 108, 10}, // reset @108
{105, 7, 100, 0}, // backward client after EIO: buf scratched but read request satisfied { 98, 1, 98, 2}, // backward not overlapping EIO: buf filled < EIO range
{110, 1, 110, 10}, // reset @110 {108,10, 108, 10}, // reset @108
{103, 5, 100, 0}, // backward overlapping tail EIO: buf scratched, EIO -> user { 99, 4, 98, 2}, // backward overlapping head EIO: buf filled < EIO range, EIO -> user
{110, 1, 110, 10}, // reset @110 {108,10, 108, 10}, // reset @108
{101, 2, 100, 0}, // backward inside EIO range: buf scratched, EIO -> user
{108, 1, 108, 10}, // reset @108
{100, 4, 98, 2}, // backward = EIO range: buf filled < EIO range, EIO -> user
{108, 1, 108, 10}, // reset @108
{ 99, 6, 98, 2}, // backward overlapping whole EIO range: buf filled <= EIO range, EIO -> user { 99, 6, 98, 2}, // backward overlapping whole EIO range: buf filled <= EIO range, EIO -> user
{108, 1, 108, 10}, // reset @108 {108,10, 108, 10}, // reset @108
{100, 4, 98, 2}, // backward = EIO range: buf filled < EIO range, EIO -> user
{110,10, 110, 10}, // reset @110
{101, 2, 100, 0}, // backward inside EIO range: buf scratched, EIO -> user
{110,10, 110, 10}, // reset @110
{103, 5, 100, 0}, // backward overlapping tail EIO: buf scratched, EIO -> user
{110,10, 110, 10}, // reset state: forward @110
{105, 7, 100, 0}, // backward client after EIO: buf scratched but read request satisfied
// backward (trend) vs EIO
// NOTE this is reverse of `backward (not trend) vs EIO
{110,10, 110, 10}, // reset state: forward @110
{105, 7, 100, 0}, // backward client after EIO: buf scratched but read request satisfied
{110,10, 110, 10}, // reset @110
{103, 5, 98, 2}, // backward overlapping tail EIO: buf scratched (XXX), EIO -> user
{110,10, 110, 10}, // reset @110
{101, 2, 93, 7}, // backward inside EIO range: buf scratched (XXX), EIO -> user
{108,10, 108, 10}, // reset @108
{100, 4, 94, 6}, // backward = EIO range: buf filled < EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 99, 6, 95, 5}, // backward overlapping whole EIO range: buf filled <= EIO range, EIO -> user
{108,10, 108, 10}, // reset @108
{ 99, 4, 98, 2}, // backward overlapping head EIO: buf filled < EIO range, EIO -> user { 99, 4, 98, 2}, // backward overlapping head EIO: buf filled < EIO range, EIO -> user
{108, 1, 108, 10}, // reset @108 {108,10, 108, 10}, // reset @108
{ 98, 1, 98, 2}, // nackward not overlapping EIO: buf filled < EIO range { 98, 1, 89, 10}, // backward not overlapping EIO: buf filled according to backward trend
{250, 4, 250, 6}, // access near EOF - buffer fill hits EOF, but not returns it to client {250, 4, 250, 6}, // access near EOF - buffer fill hits EOF, but not returns it to client
{254, 5, 256, 0}, // access overlapping EOF - EOF returned, buf scratched {254, 5, 256, 0}, // access overlapping EOF - EOF returned, buf scratched
...@@ -103,6 +126,7 @@ func TestSeqBufReader(t *testing.T) { ...@@ -103,6 +126,7 @@ func TestSeqBufReader(t *testing.T) {
rb := NewSeqBufReaderSize(r, 10) // with 10 it is easier to do/check math for a human rb := NewSeqBufReaderSize(r, 10) // with 10 it is easier to do/check math for a human
for _, tt := range xSeqBufTestv { for _, tt := range xSeqBufTestv {
fmt.Println(tt)
pOk := make([]byte, tt.Len) pOk := make([]byte, tt.Len)
pB := make([]byte, tt.Len) pB := make([]byte, tt.Len)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment