Commit acad7a9d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 779d42dc
...@@ -17,9 +17,9 @@ type SeqBufReader struct { ...@@ -17,9 +17,9 @@ type SeqBufReader struct {
pos int64 pos int64
buf []byte buf []byte
// detected io direction (0 - don't know yet, >0 - forward, <0 - backward) // // detected io direction (0 - don't know yet, >0 - forward, <0 - backward)
// XXX strictly 0, +1, -1 ? // // XXX strictly 0, +1, -1 ?
dir int // dir int
} }
const defaultSeqBufSize = 8192 // XXX retune - must be <= size(L1d) / 2 const defaultSeqBufSize = 8192 // XXX retune - must be <= size(L1d) / 2
...@@ -29,7 +29,7 @@ func NewSeqBufReader(r io.ReaderAt) *SeqBufReader { ...@@ -29,7 +29,7 @@ func NewSeqBufReader(r io.ReaderAt) *SeqBufReader {
} }
func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader { func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader {
sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size), dir: 0} sb := &SeqBufReader{r: r, pos: 0, buf: make([]byte, 0, size)} //, dir: 0}
return sb return sb
} }
...@@ -39,70 +39,48 @@ func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader { ...@@ -39,70 +39,48 @@ func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader {
func (sb *SeqBufReader) readFromBuf(p []byte, pos int64) (int, []byte, int64) { func (sb *SeqBufReader) readFromBuf(p []byte, pos int64) (int, []byte, int64) {
n := 0 n := 0
switch {
// use buffered data: start + forward // use buffered data: start + forward
if pos >= sb.pos && pos < sb.pos + int64(len(sb.buf)) { case pos >= sb.pos && pos < sb.pos + int64(len(sb.buf)):
copyPos := pos - sb.pos copyPos := pos - sb.pos
n = copy(p, sb.buf[copyPos:]) // NOTE len(p) can be < len(sb[copyPos:]) n = copy(p, sb.buf[copyPos:]) // NOTE len(p) can be < len(sb[copyPos:])
p = p[n:] p = p[n:]
pos += int64(n) pos += int64(n)
sb.dir = +1 // XXX recheck //sb.dir = +1 // XXX recheck
}
// XXX use buffered data: tail + backward // use buffered data: tail + backward
if pos + int64(len(p)) <= sb.pos + int64(len(sb.buf)) && pos + int64(len(p)) > sb.pos { case pos + int64(len(p)) <= sb.pos + int64(len(sb.buf)) && pos + int64(len(p)) > sb.pos:
// TODO // TODO
sb.dir = -1 // XXX recheck //sb.dir = -1 // XXX recheck
} }
return n, p, pos return n, p, pos
} }
// XXX place
func sign(x int64) int {
switch {
case x > 0:
return +1
case x < 0:
return -1
default:
return 0
}
}
func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (n int, err error) { func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (n int, err error) {
// if request size > buffer - read data directly // if request size > buffer - read data directly
if len(p) > cap(sb.buf) { if len(p) > cap(sb.buf) {
sb.dir = sign(pos - sb.pos) // XXX recheck
return sb.r.ReadAt(p, pos) return sb.r.ReadAt(p, pos)
} }
n, p, pos = sb.readFromBuf(p, pos) n, p, pos = sb.readFromBuf(p, pos)
// all was read from buffer // if all was read from buffer - we are done
if len(p) == 0 { if len(p) == 0 {
return n, nil return n, nil
} }
// otherwise we need to refill the buffer. determine range to read by current IO direction. // otherwise we need to refill the buffer. determine range to read by current IO direction.
// XXX recheck .dir handling //
// XXX strictly speaking it is better to compare pos vs pos(last-IO).
// when there were big read requests which don't go through buffer, sb.pos remains not updated
// and this, on direction change, can result on 1 buffered read in the wrong direction
var xpos int64 var xpos int64
switch { if pos > sb.pos {
case sb.dir == 0:
// we don't know direction yet - usually it is first request.
// cover pos/p symmetrically. This way we will give hopefully
// give enough overlap for next read request to determine
// direction.
xpos = pos - int64(cap(sb.buf) - len(p)) / 2
if xpos < 0 {
xpos = 0
}
case sb.dir > 0:
// forward // forward
xpos = pos xpos = pos
} else {
default:
// backward // backward
//xpos = max64(pos - cap(sb.buf), 0) //xpos = max64(pos - cap(sb.buf), 0)
xpos = pos - int64(cap(sb.buf)) xpos = pos - int64(cap(sb.buf))
...@@ -115,9 +93,11 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (n int, err error) { ...@@ -115,9 +93,11 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (n int, err error) {
nn, err := sb.r.ReadAt(buf, xpos) nn, err := sb.r.ReadAt(buf, xpos)
// even if there was an error, e.g. after reading part, we remember data read in buffer // even if there was an error, e.g. after reading part, we remember data read in buffer
// XXX only `if nn > 0` ? // XXX nn == 0 -> return err ?
if nn > 0 {
sb.pos = xpos sb.pos = xpos
sb.buf = buf[:nn] sb.buf = buf[:nn]
}
// here we know: // here we know:
// - some data was read XXX recheck // - some data was read XXX recheck
...@@ -137,3 +117,17 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (n int, err error) { ...@@ -137,3 +117,17 @@ func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (n int, err error) {
// all done // all done
return n, err return n, err
} }
// XXX place ?
func sign(x int64) int {
switch {
case x > 0:
return +1
case x < 0:
return -1
default:
return 0
}
}
...@@ -42,17 +42,21 @@ func TestSeqBufReader(t *testing.T) { ...@@ -42,17 +42,21 @@ func TestSeqBufReader(t *testing.T) {
r := &XReader{} r := &XReader{}
rb := NewSeqBufReaderSize(r, 10) // with 10 it is easier to do/check math for a human rb := NewSeqBufReaderSize(r, 10) // with 10 it is easier to do/check math for a human
// read @pos/len -> rb.pos, len(rb.buf), rb.dir // read @pos/len -> rb.pos, len(rb.buf) //, rb.dir
testv := []struct {pos int64; Len int; bufPos int64; bufLen, bufDir int} { testv := []struct {pos int64; Len int; bufPos int64; bufLen int} { //, bufDir int} {
{40, 5, 38, 10, 0}, // 1st access - symmetrically covered {40, 5, 40, 10}, // 1st access, forward by default
{45, 5, 48, 10, +1}, // part taken from buf, part read next, forward detected {45, 7, 50, 10}, // part taken from buf, part read next, forward detected
{50, 5, 48, 10, +1}, // everything taken from buf {52, 5, 50, 10}, // everything taken from buf
{55, 5, 58, 10, +1}, // part taken from buf, part read next {57, 5, 60, 10}, // part taken from buf, part read next
{60, 10, 58, 10, +1}, // access bigger than buf {60, 11, 60, 10}, // access > cap(buf)
{70, 10, 58, 10, +1}, // access bigger than buf, once again {71, 11, 60, 10}, // access > cap(buf), once again
// XXX big access going backward - detect dir change // XXX big access going backward - detect dir change
// TODO accees around and in error range
// TODO overlap
} }
for _, tt := range testv { for _, tt := range testv {
...@@ -68,8 +72,8 @@ func TestSeqBufReader(t *testing.T) { ...@@ -68,8 +72,8 @@ func TestSeqBufReader(t *testing.T) {
} }
// verify buffer state // verify buffer state
if !(rb.pos == tt.bufPos && len(rb.buf) == tt.bufLen && rb.dir == tt.bufDir) { if !(rb.pos == tt.bufPos && len(rb.buf) == tt.bufLen){ // && rb.dir == tt.bufDir) {
t.Fatalf("%v: -> unexpected buffer state @%v #%v %+d", tt, rb.pos, len(rb.buf), rb.dir) t.Fatalf("%v: -> unexpected buffer state @%v #%v", tt, rb.pos, len(rb.buf)) //, rb.dir)
} }
} }
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment