Commit 7b0c6d79 authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

fuse: reimplement BufferPool using sync.Pool.

parent b55cc8ad
package fuse package fuse
import ( import (
"fmt"
"strings"
"sync" "sync"
"unsafe"
) )
var paranoia bool var paranoia bool
...@@ -20,9 +17,6 @@ type BufferPool interface { ...@@ -20,9 +17,6 @@ type BufferPool interface {
// AllocBuffer. It is not an error to call FreeBuffer() on a slice // AllocBuffer. It is not an error to call FreeBuffer() on a slice
// obtained elsewhere. // obtained elsewhere.
FreeBuffer(slice []byte) FreeBuffer(slice []byte)
// Return debug information.
String() string
} }
type gcBufferPool struct { type gcBufferPool struct {
...@@ -33,10 +27,6 @@ func NewGcBufferPool() BufferPool { ...@@ -33,10 +27,6 @@ func NewGcBufferPool() BufferPool {
return &gcBufferPool{} return &gcBufferPool{}
} }
func (p *gcBufferPool) String() string {
return "gc"
}
func (p *gcBufferPool) AllocBuffer(size uint32) []byte { func (p *gcBufferPool) AllocBuffer(size uint32) []byte {
return make([]byte, size) return make([]byte, size)
} }
...@@ -48,14 +38,7 @@ type bufferPoolImpl struct { ...@@ -48,14 +38,7 @@ type bufferPoolImpl struct {
lock sync.Mutex lock sync.Mutex
// For each page size multiple a list of slice pointers. // For each page size multiple a list of slice pointers.
buffersBySize [][][]byte buffersBySize []*sync.Pool
// start of slice => true
outstandingBuffers map[uintptr]bool
// Total count of created buffers. Handy for finding memory
// leaks.
createdBuffers int
} }
// NewBufferPool returns a BufferPool implementation that that returns // NewBufferPool returns a BufferPool implementation that that returns
...@@ -65,44 +48,22 @@ type bufferPoolImpl struct { ...@@ -65,44 +48,22 @@ type bufferPoolImpl struct {
// buffers beyond the handler's return. // buffers beyond the handler's return.
func NewBufferPool() BufferPool { func NewBufferPool() BufferPool {
bp := new(bufferPoolImpl) bp := new(bufferPoolImpl)
bp.buffersBySize = make([][][]byte, 0, 32)
bp.outstandingBuffers = make(map[uintptr]bool)
return bp return bp
} }
func (p *bufferPoolImpl) String() string { func (p *bufferPoolImpl) getPool(pageCount int) *sync.Pool {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() for len(p.buffersBySize) < pageCount+1 {
p.buffersBySize = append(p.buffersBySize, nil)
result := []string{}
for exp, bufs := range p.buffersBySize {
if len(bufs) > 0 {
result = append(result, fmt.Sprintf("%d=%d", exp, len(bufs)))
}
} }
return fmt.Sprintf("created: %d, outstanding %d. Sizes: %s", if p.buffersBySize[pageCount] == nil {
p.createdBuffers, len(p.outstandingBuffers), p.buffersBySize[pageCount] = &sync.Pool{
strings.Join(result, ", ")) New: func() interface{} { return make([]byte, PAGESIZE*pageCount) },
}
func (p *bufferPoolImpl) getBuffer(pageCount int) []byte {
for ; pageCount < len(p.buffersBySize); pageCount++ {
bufferList := p.buffersBySize[pageCount]
if len(bufferList) > 0 {
result := bufferList[len(bufferList)-1]
p.buffersBySize[pageCount] = p.buffersBySize[pageCount][:len(bufferList)-1]
return result
} }
} }
pool := p.buffersBySize[pageCount]
return nil p.lock.Unlock()
} return pool
func (p *bufferPoolImpl) addBuffer(slice []byte, pages int) {
for len(p.buffersBySize) <= int(pages) {
p.buffersBySize = append(p.buffersBySize, make([][]byte, 0))
}
p.buffersBySize[pages] = append(p.buffersBySize[pages], slice)
} }
func (p *bufferPoolImpl) AllocBuffer(size uint32) []byte { func (p *bufferPoolImpl) AllocBuffer(size uint32) []byte {
...@@ -114,27 +75,9 @@ func (p *bufferPoolImpl) AllocBuffer(size uint32) []byte { ...@@ -114,27 +75,9 @@ func (p *bufferPoolImpl) AllocBuffer(size uint32) []byte {
if sz%PAGESIZE != 0 { if sz%PAGESIZE != 0 {
sz += PAGESIZE sz += PAGESIZE
} }
psz := sz / PAGESIZE pages := sz / PAGESIZE
p.lock.Lock()
var b []byte
b = p.getBuffer(psz)
if b == nil {
p.createdBuffers++
b = make([]byte, size, psz*PAGESIZE)
} else {
b = b[:size]
}
p.outstandingBuffers[uintptr(unsafe.Pointer(&b[0]))] = true
// For testing should not have more than 20 buffers outstanding.
if paranoia && (p.createdBuffers > 50 || len(p.outstandingBuffers) > 50) {
panic("Leaking buffers")
}
p.lock.Unlock()
b := p.getPool(pages).Get().([]byte)
return b return b
} }
...@@ -145,15 +88,8 @@ func (p *bufferPoolImpl) FreeBuffer(slice []byte) { ...@@ -145,15 +88,8 @@ func (p *bufferPoolImpl) FreeBuffer(slice []byte) {
if cap(slice)%PAGESIZE != 0 || cap(slice) == 0 { if cap(slice)%PAGESIZE != 0 || cap(slice) == 0 {
return return
} }
psz := cap(slice) / PAGESIZE pages := cap(slice) / PAGESIZE
slice = slice[:psz] slice = slice[:cap(slice)]
key := uintptr(unsafe.Pointer(&slice[0]))
p.lock.Lock() p.getPool(pages).Put(slice)
ok := p.outstandingBuffers[key]
if ok {
p.addBuffer(slice, psz)
delete(p.outstandingBuffers, key)
}
p.lock.Unlock()
} }
package fuse
import (
"testing"
)
func TestBufferPool(t *testing.T) {
bp := NewBufferPool()
b1 := bp.AllocBuffer(PAGESIZE)
_ = bp.AllocBuffer(2 * PAGESIZE)
bp.FreeBuffer(b1)
b1_2 := bp.AllocBuffer(PAGESIZE)
if &b1[0] != &b1_2[0] {
t.Error("bp 0")
}
}
func TestFreeBufferEmpty(t *testing.T) {
bp := NewBufferPool()
c := make([]byte, 0, 2*PAGESIZE)
bp.FreeBuffer(c)
}
...@@ -182,15 +182,12 @@ func NewServer(fs RawFileSystem, mountPoint string, opts *MountOptions) (*Server ...@@ -182,15 +182,12 @@ func NewServer(fs RawFileSystem, mountPoint string, opts *MountOptions) (*Server
// DebugData returns internal status information for debugging // DebugData returns internal status information for debugging
// purposes. // purposes.
func (ms *Server) DebugData() string { func (ms *Server) DebugData() string {
s := ms.opts.Buffers.String()
var r int var r int
ms.reqMu.Lock() ms.reqMu.Lock()
r = ms.reqReaders r = ms.reqReaders
ms.reqMu.Unlock() ms.reqMu.Unlock()
s += fmt.Sprintf(" readers: %d", r) return fmt.Sprintf("readers: %d", r)
return s
} }
// What is a good number? Maybe the number of CPUs? // What is a good number? Maybe the number of CPUs?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment