Commit 55d3198d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a5d85899
......@@ -6,17 +6,22 @@ package fs1
import (
"io"
)
// FIXME things are not so good with fstail (backward iteration in lock-step)
"log"
)
// SeqBufReader implements buffering for a io.ReaderAt optimized for sequential access
// FIXME access from multiple goroutines? (it is required per io.ReaderAt
// interface, but for sequential workloads we do not need it)
// XXX -> xbufio.SeqReader
type SeqBufReader struct {
// buffer for data at pos. cap(buf) - whole buffer capacity
buf []byte
pos int64
// position of last IO (can be != .pos because large reads are not buffered)
posLastIO int64
r io.ReaderAt
}
......@@ -28,110 +33,143 @@ func NewSeqBufReader(r io.ReaderAt) *SeqBufReader {
}
func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader {
sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size)}
// XXX posLastIO - to which to init ?
sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size), posLastIO: 0}
return sb
}
// readFromBuf reads as much as possible for ReadAt(p, pos) request from buffered data
// it returns nread and (p', pos') that was left for real reading to complete
func (sb *SeqBufReader) readFromBuf(p []byte, pos int64) (int, []byte, int64) {
n := 0
// use buffered data: start + forward
if sb.pos <= pos && pos < sb.pos + int64(len(sb.buf)) {
n = copy(p, sb.buf[pos - sb.pos:]) // NOTE len(p) can be < len(sb[copyPos:])
p = p[n:]
pos += int64(n)
// use buffered data: tail + backward
} else if posAfter := pos + int64(len(p));
len(p) != 0 &&
sb.pos < posAfter && posAfter <= sb.pos + int64(len(sb.buf)) {
// here we know pos < sb.pos
//
// proof: consider if pos >= sb.pos.
// Then from `pos <= sb.pos + len(sb.buf) - len(p)` above it follow that:
// `pos < sb.pos + len(sb.buf)` (NOTE strictly < because if len(p) > 0)
// and we come to condition which is used in `start + forward` if
n = copy(p[sb.pos - pos:], sb.buf) // NOTE n == len(p[sb.pos - pos:])
p = p[:sb.pos - pos]
// pos for actual read stays the same
}
return n, p, pos
// XXX temp
func init() {
log.SetFlags(0)
}
func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (n int, err error) {
func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) {
// if request size > buffer - read data directly
if len(p) > cap(sb.buf) {
// no copying from sb.buf here at all as if e.g. we could copy from sb.buf, the
// kernel can copy the same data from pagecache as well, and it will take the same time
// because for data in sb.buf corresponding page in pagecache has high p. to be hot.
log.Printf("READ [%v, %v)\t#%v", pos, pos + len64(p), len(p))
sb.posLastIO = pos
return sb.r.ReadAt(p, pos)
}
var nhead int // #data read from buffer for p head
var ntail int // #data read from buffer for p tail
// try to satisfy read request via (partly) reading from buffer
//n, p, pos = sb.readFromBuf(p, pos)
// ---- 8< ---- (inlined readFromBuf)
switch {
// use buffered data: start + forward
if sb.pos <= pos && pos < sb.pos + int64(len(sb.buf)) {
n = copy(p, sb.buf[pos - sb.pos:]) // NOTE len(p) can be < len(sb[copyPos:])
p = p[n:]
pos += int64(n)
case sb.pos <= pos && pos < sb.pos + len64(sb.buf):
nhead = copy(p, sb.buf[pos - sb.pos:]) // NOTE len(p) can be < len(sb[copyPos:])
// if all was read from buffer - we are done
if nhead == len(p) {
return nhead, nil
}
p = p[nhead:]
pos += int64(nhead)
// emptry request (possibly not hitting buffer - do not let it go to real IO path)
// `len(p) != 0` is also needed for backward reading from buffer, so this condition goes before
case len(p) == 0:
return 0, nil
// use buffered data: tail + backward
} else if posAfter := pos + int64(len(p));
len(p) != 0 &&
sb.pos < posAfter && posAfter <= sb.pos + int64(len(sb.buf)) {
case posAfter := pos + len64(p);
sb.pos < posAfter && posAfter <= sb.pos + len64(sb.buf):
// here we know pos < sb.pos
//
// proof: consider if pos >= sb.pos.
// Then from `pos <= sb.pos + len(sb.buf) - len(p)` above it follow that:
// `pos < sb.pos + len(sb.buf)` (NOTE strictly < because if len(p) > 0)
// and we come to condition which is used in `start + forward` if
ntail = copy(p[sb.pos - pos:], sb.buf) // NOTE ntail == len(p[sb.pos - pos:])
// NOTE no return here: full p read is impossible for backward
// p filling: it would mean `pos = sb.pos` which in turn means
// the condition for forward buf reading must have been triggered.
n = copy(p[sb.pos - pos:], sb.buf) // NOTE n == len(p[sb.pos - pos:])
p = p[:sb.pos - pos]
// pos for actual read stays the same
// pos stays the same
}
// ---- 8< ----
// if all was read from buffer - we are done
if len(p) == 0 {
return n, nil
}
// otherwise we need to refill the buffer. determine range to read by current IO direction.
//
// XXX strictly speaking it is better to compare pos vs pos(last_IO).
// when there were big read requests which don't go through buffer, sb.pos remains not updated
// and this, on direction change, can result on 1 buffered read in the wrong direction.
// but hopefully it is pretty minor and can be ignored.
var xpos int64
if pos >= sb.pos {
// here we need to refill the buffer. determine range to read by current IO direction.
// NOTE len(p) <= cap(sb.buf)
var xpos int64 // position for new IO request
if pos >= sb.posLastIO {
// forward
xpos = pos
} else {
// backward
xpos = pos + int64(len(p)) - int64(cap(sb.buf))
// by default we want to read forward, even when iterating backward
// (there are frequent jumps backward for reading a record there forward)
xpos = pos
// but if this will overlap with last access range, probably
// it is better (we are optimizing for sequential access) to
// shift loading region down not to overlap.
//
// we have to take into account that last and current access regions
// can overlap, if e.g. last access was big non-buffered read.
if xpos + cap64(sb.buf) > sb.posLastIO {
xpos = max64(sb.posLastIO, xpos + len64(p)) - cap64(sb.buf)
}
// don't let reading go beyond start of the file
if xpos < 0 {
xpos = 0
}
}
log.Printf("read [%v, %v)\t#%v", xpos, xpos + cap64(sb.buf), cap(sb.buf))
sb.posLastIO = xpos
nn, err := sb.r.ReadAt(sb.buf[:cap(sb.buf)], xpos)
// nothing read - just return the error
if nn == 0 {
return n, err
return nhead, err
}
// even if there was an error, but data partly read, we remember it in the buffer
sb.pos = xpos
sb.buf = sb.buf[:nn]
// here we know:
// - some data was read
// - in case of successful read pos/p lies completely inside sb.pos/sb.buf
// copy loaded data from buffer to p
pBufOffset := pos - xpos // offset corresponding to p in sb.buf XXX naming
if pBufOffset >= len64(sb.buf) {
// this can be only when for backward reading there was EIO
// before data covering pos/p. Just return the error
return nhead, err
}
nn = copy(p, sb.buf[pBufOffset:])
if nn < len(p) {
// some error - do not account tail - we did not get to it
return nhead + nn, err
}
// all ok
// NOTE if there was an error - we can skip it if original read request was completely satisfied
// NOTE not preserving EOF at ends - not required per ReaderAt interface
return nhead + nn + ntail, nil
/*
// here we know:
// - some data was read
// - len(p) < cap(sb.buf)
// - len(p) <= cap(sb.buf)
// - there is overlap in between pos/p vs sb.pos/sb.buf
// try to read again what is left to read from the buffer
// nn, p, pos = sb.readFromBuf(p, pos)
......@@ -170,4 +208,17 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (n int, err error) {
// all done
return n, err
*/
}
// utilities:
// len and cap as int64 (we frequently need them and int64 is covering int so
// the conversion is not lossy)
func len64(b []byte) int64 { return int64(len(b)) }
func cap64(b []byte) int64 { return int64(cap(b)) }
// min/max
func min64(a, b int64) int64 { if a < b { return a } else { return b} }
func max64(a, b int64) int64 { if a > b { return a } else { return b} }
......@@ -40,7 +40,7 @@ func (r *XReader) ReadAt(p []byte, pos int64) (n int, err error) {
// read @pos/len -> rb.pos, len(rb.buf)
var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} { // XXX add ioPos ?
{40, 5, 40, 10}, // 1st access, forward by default
{45, 7, 50, 10}, // part taken from buf, part read next, forward
{52, 5, 50, 10}, // everything taken from buf
......@@ -56,11 +56,18 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
{180,70, 105, 10}, // big access ~ forward
{170,11, 105, 10}, // big access backward
{160,11, 105, 10}, // big access backward, once more
{155, 5, 155, 10}, // access backward - buffer refilled
{155, 5, 150, 10}, // access backward - buffer refilled taking last IO into account
// XXX refilled forward first time after big backward readings
{150, 5, 145, 10}, // next access backward - buffer refilled backward
{143, 7, 135, 10}, // backward once again - buffer refilled backward
// TODO backward after not big-backward (after regular forward)
// TODO backward after big-backward overlapping
// TODO backward over EIO - check returned n
// TODO backward after forward when posLastAccess is in forward tail
// TODO zero-sized out-of-buffer read do not change buffer
{250, 4, 250, 6}, // access near EOF - buffer fill hits EOF, but not returns it to client
{254, 5, 250, 6}, // access overlapping EOF - EOF returned
{256, 1, 250, 6}, // access past EOF -> EOF
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment