Commit 097754d4 authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

Prepare read API for splicing.

parent de0799bb
...@@ -77,6 +77,7 @@ type FsNode interface { ...@@ -77,6 +77,7 @@ type FsNode interface {
StatFs() *StatfsOut StatFs() *StatfsOut
} }
// A filesystem API that uses paths rather than inodes. A minimal // A filesystem API that uses paths rather than inodes. A minimal
// file system should have at least a functional GetAttr method. // file system should have at least a functional GetAttr method.
// Typically, each call happens in its own goroutine, so take care to // Typically, each call happens in its own goroutine, so take care to
...@@ -161,7 +162,7 @@ type File interface { ...@@ -161,7 +162,7 @@ type File interface {
// the inner file here. // the inner file here.
InnerFile() File InnerFile() File
Read(dest []byte, off int64) ([]byte, Status) Read(dest []byte, off int64) ReadResult
Write(data []byte, off int64) (written uint32, code Status) Write(data []byte, off int64) (written uint32, code Status)
Flush() Status Flush() Status
Release() Release()
...@@ -278,7 +279,7 @@ type RawFileSystem interface { ...@@ -278,7 +279,7 @@ type RawFileSystem interface {
// File handling. // File handling.
Create(out *raw.CreateOut, header *raw.InHeader, input *raw.CreateIn, name string) (code Status) Create(out *raw.CreateOut, header *raw.InHeader, input *raw.CreateIn, name string) (code Status)
Open(out *raw.OpenOut, header *raw.InHeader, input *raw.OpenIn) (status Status) Open(out *raw.OpenOut, header *raw.InHeader, input *raw.OpenIn) (status Status)
Read(*raw.InHeader, *ReadIn, []byte) ([]byte, Status) Read(*raw.InHeader, *ReadIn, []byte) ReadResult
Release(header *raw.InHeader, input *raw.ReleaseIn) Release(header *raw.InHeader, input *raw.ReleaseIn)
Write(*raw.InHeader, *WriteIn, []byte) (written uint32, code Status) Write(*raw.InHeader, *WriteIn, []byte) (written uint32, code Status)
......
...@@ -30,24 +30,26 @@ func CopyFile(srcFs, destFs FileSystem, srcFile, destFile string, context *Conte ...@@ -30,24 +30,26 @@ func CopyFile(srcFs, destFs FileSystem, srcFile, destFile string, context *Conte
buf := make([]byte, 128 * (1 << 10)) buf := make([]byte, 128 * (1 << 10))
off := int64(0) off := int64(0)
for { for {
data, code := src.Read(buf, off) res := src.Read(buf, off)
if !code.Ok() { if !res.Ok() {
return code return res.Status
} }
if len(data) == 0 { res.Read(buf)
if len(res.Data) == 0 {
break break
} }
n, code := dst.Write(data, off) n, code := dst.Write(res.Data, off)
if !code.Ok() { if !code.Ok() {
return code return code
} }
if int(n) < len(data) { if int(n) < len(res.Data) {
return EIO return EIO
} }
if len(data) < len(buf) { if len(res.Data) < len(buf) {
break break
} }
off += int64(len(data)) off += int64(len(res.Data))
} }
return OK return OK
} }
...@@ -8,6 +8,8 @@ import ( ...@@ -8,6 +8,8 @@ import (
var _ = log.Println var _ = log.Println
var _ = (File)((*DefaultFile)(nil))
func (f *DefaultFile) SetInode(*Inode) { func (f *DefaultFile) SetInode(*Inode) {
} }
...@@ -19,8 +21,8 @@ func (f *DefaultFile) String() string { ...@@ -19,8 +21,8 @@ func (f *DefaultFile) String() string {
return "DefaultFile" return "DefaultFile"
} }
func (f *DefaultFile) Read(buf []byte, off int64) ([]byte, Status) { func (f *DefaultFile) Read(buf []byte, off int64) ReadResult {
return nil, ENOSYS return ReadResult{Status: ENOSYS}
} }
func (f *DefaultFile) Write(data []byte, off int64) (uint32, Status) { func (f *DefaultFile) Write(data []byte, off int64) (uint32, Status) {
......
...@@ -97,8 +97,8 @@ func (fs *DefaultRawFileSystem) OpenDir(out *raw.OpenOut, header *raw.InHeader, ...@@ -97,8 +97,8 @@ func (fs *DefaultRawFileSystem) OpenDir(out *raw.OpenOut, header *raw.InHeader,
return ENOSYS return ENOSYS
} }
func (fs *DefaultRawFileSystem) Read(header *raw.InHeader, input *ReadIn, buf []byte) ([]byte, Status) { func (fs *DefaultRawFileSystem) Read(header *raw.InHeader, input *ReadIn, buf []byte) ReadResult {
return nil, ENOSYS return ReadResult{}
} }
func (fs *DefaultRawFileSystem) Release(header *raw.InHeader, input *raw.ReleaseIn) { func (fs *DefaultRawFileSystem) Release(header *raw.InHeader, input *raw.ReleaseIn) {
......
...@@ -2,7 +2,6 @@ package fuse ...@@ -2,7 +2,6 @@ package fuse
import ( import (
"fmt" "fmt"
"io"
"os" "os"
"syscall" "syscall"
) )
...@@ -40,13 +39,15 @@ func NewDataFile(data []byte) *DataFile { ...@@ -40,13 +39,15 @@ func NewDataFile(data []byte) *DataFile {
return f return f
} }
func (f *DataFile) Read(buf []byte, off int64) ([]byte, Status) { func (f *DataFile) Read(buf []byte, off int64) (res ReadResult) {
end := int(off) + int(len(buf)) end := int(off) + int(len(buf))
if end > len(f.data) { if end > len(f.data) {
end = len(f.data) end = len(f.data)
} }
return f.data[off:end], OK res.Data = f.data[off:end]
res.Status = OK
return res
} }
//////////////// ////////////////
...@@ -66,8 +67,8 @@ func (f *DevNullFile) String() string { ...@@ -66,8 +67,8 @@ func (f *DevNullFile) String() string {
return "DevNullFile" return "DevNullFile"
} }
func (f *DevNullFile) Read(buf []byte, off int64) ([]byte, Status) { func (f *DevNullFile) Read(buf []byte, off int64) ReadResult {
return nil, OK return ReadResult{}
} }
func (f *DevNullFile) Write(content []byte, off int64) (uint32, Status) { func (f *DevNullFile) Write(content []byte, off int64) (uint32, Status) {
...@@ -99,12 +100,13 @@ func (f *LoopbackFile) String() string { ...@@ -99,12 +100,13 @@ func (f *LoopbackFile) String() string {
return fmt.Sprintf("LoopbackFile(%s)", f.File.Name()) return fmt.Sprintf("LoopbackFile(%s)", f.File.Name())
} }
func (f *LoopbackFile) Read(buf []byte, off int64) ([]byte, Status) { func (f *LoopbackFile) Read(buf []byte, off int64) (res ReadResult) {
n, err := f.File.ReadAt(buf, off) return ReadResult{
if err == io.EOF { Fd: f.File.Fd(),
err = nil FdOff: off,
FdSize:len(buf),
Status: OK,
} }
return buf[:n], ToStatus(err)
} }
func (f *LoopbackFile) Write(data []byte, off int64) (uint32, Status) { func (f *LoopbackFile) Write(data []byte, off int64) (uint32, Status) {
......
...@@ -22,8 +22,9 @@ func (f *MutableDataFile) String() string { ...@@ -22,8 +22,9 @@ func (f *MutableDataFile) String() string {
return "MutableDataFile" return "MutableDataFile"
} }
func (f *MutableDataFile) Read(buf []byte, off int64) ([]byte, Status) { func (f *MutableDataFile) Read(buf []byte, off int64) (r ReadResult) {
return f.data[off : off+int64(len(buf))], OK r.Data = f.data[off : off+int64(len(buf))]
return r
} }
func (f *MutableDataFile) Write(d []byte, off int64) (uint32, Status) { func (f *MutableDataFile) Write(d []byte, off int64) (uint32, Status) {
......
...@@ -350,7 +350,7 @@ func (c *FileSystemConnector) Write(header *raw.InHeader, input *WriteIn, data [ ...@@ -350,7 +350,7 @@ func (c *FileSystemConnector) Write(header *raw.InHeader, input *WriteIn, data [
return opened.WithFlags.File.Write(data, int64(input.Offset)) return opened.WithFlags.File.Write(data, int64(input.Offset))
} }
func (c *FileSystemConnector) Read(header *raw.InHeader, input *ReadIn, buf []byte) ([]byte, Status) { func (c *FileSystemConnector) Read(header *raw.InHeader, input *ReadIn, buf []byte) (ReadResult) {
node := c.toInode(header.NodeId) node := c.toInode(header.NodeId)
opened := node.mount.getOpenedFile(input.Fh) opened := node.mount.getOpenedFile(input.Fh)
......
...@@ -276,7 +276,7 @@ func (fs *LockingRawFileSystem) ReleaseDir(header *raw.InHeader, h *raw.ReleaseI ...@@ -276,7 +276,7 @@ func (fs *LockingRawFileSystem) ReleaseDir(header *raw.InHeader, h *raw.ReleaseI
fs.RawFileSystem.ReleaseDir(header, h) fs.RawFileSystem.ReleaseDir(header, h)
} }
func (fs *LockingRawFileSystem) Read(header *raw.InHeader, input *ReadIn, buf []byte) ([]byte, Status) { func (fs *LockingRawFileSystem) Read(header *raw.InHeader, input *ReadIn, buf []byte) ReadResult {
defer fs.locked()() defer fs.locked()()
return fs.RawFileSystem.Read(header, input, buf) return fs.RawFileSystem.Read(header, input, buf)
} }
......
...@@ -322,6 +322,10 @@ func (ms *MountState) handleRequest(req *request) { ...@@ -322,6 +322,10 @@ func (ms *MountState) handleRequest(req *request) {
} }
func (ms *MountState) AllocOut(req *request, size uint32) []byte { func (ms *MountState) AllocOut(req *request, size uint32) []byte {
if cap(req.bufferPoolOutputBuf) >= int(size) {
req.bufferPoolOutputBuf = req.bufferPoolOutputBuf[:size]
return req.bufferPoolOutputBuf
}
if req.bufferPoolOutputBuf != nil { if req.bufferPoolOutputBuf != nil {
ms.buffers.FreeBuffer(req.bufferPoolOutputBuf) ms.buffers.FreeBuffer(req.bufferPoolOutputBuf)
} }
...@@ -336,7 +340,7 @@ func (ms *MountState) write(req *request) Status { ...@@ -336,7 +340,7 @@ func (ms *MountState) write(req *request) Status {
return OK return OK
} }
header, data := req.serialize() header := req.serializeHeader()
if ms.Debug { if ms.Debug {
log.Println(req.OutputDebug()) log.Println(req.OutputDebug())
} }
...@@ -348,16 +352,30 @@ func (ms *MountState) write(req *request) Status { ...@@ -348,16 +352,30 @@ func (ms *MountState) write(req *request) Status {
if header == nil { if header == nil {
return OK return OK
} }
var err error
if data == nil { if req.flatData.Size() == 0 {
_, err = ms.mountFile.Write(header) _, err := ms.mountFile.Write(header)
} else { return ToStatus(err)
_, err = Writev(int(ms.mountFile.Fd()), [][]byte{header, data}) }
if req.flatData.FdSize > 0 {
if ms.TrySplice(req) {
return OK
} else {
buf := ms.AllocOut(req, uint32(req.flatData.FdSize))
req.flatData.Read(buf)
}
} }
_, err := Writev(int(ms.mountFile.Fd()), [][]byte{header, req.flatData.Data})
return ToStatus(err) return ToStatus(err)
} }
func (ms *MountState) TrySplice(req *request) bool {
// TODO - implement.
return false
}
func (ms *MountState) writeInodeNotify(entry *raw.NotifyInvalInodeOut) Status { func (ms *MountState) writeInodeNotify(entry *raw.NotifyInvalInodeOut) Status {
req := request{ req := request{
inHeader: &raw.InHeader{ inHeader: &raw.InHeader{
...@@ -392,7 +410,7 @@ func (ms *MountState) writeEntryNotify(parent uint64, name string) Status { ...@@ -392,7 +410,7 @@ func (ms *MountState) writeEntryNotify(parent uint64, name string) Status {
// terminating null byte is missing. // terminating null byte is missing.
nameBytes := []byte(name + "\000") nameBytes := []byte(name + "\000")
req.outData = unsafe.Pointer(entry) req.outData = unsafe.Pointer(entry)
req.flatData = nameBytes req.flatData.Data = nameBytes
result := ms.write(&req) result := ms.write(&req)
if ms.Debug { if ms.Debug {
......
...@@ -123,7 +123,7 @@ func doReadDir(state *MountState, req *request) { ...@@ -123,7 +123,7 @@ func doReadDir(state *MountState, req *request) {
entries := NewDirEntryList(buf, uint64(in.Offset)) entries := NewDirEntryList(buf, uint64(in.Offset))
code := state.fileSystem.ReadDir(entries, req.inHeader, in) code := state.fileSystem.ReadDir(entries, req.inHeader, in)
req.flatData = entries.Bytes() req.flatData.Data = entries.Bytes()
req.status = code req.status = code
} }
...@@ -192,7 +192,7 @@ func doGetXAttr(state *MountState, req *request) { ...@@ -192,7 +192,7 @@ func doGetXAttr(state *MountState, req *request) {
return return
} }
req.flatData = data req.flatData.Data = data
} }
func doGetAttr(state *MountState, req *request) { func doGetAttr(state *MountState, req *request) {
...@@ -223,7 +223,7 @@ func doBatchForget(state *MountState, req *request) { ...@@ -223,7 +223,7 @@ func doBatchForget(state *MountState, req *request) {
} }
func doReadlink(state *MountState, req *request) { func doReadlink(state *MountState, req *request) {
req.flatData, req.status = state.fileSystem.Readlink(req.inHeader) req.flatData.Data, req.status = state.fileSystem.Readlink(req.inHeader)
} }
func doLookup(state *MountState, req *request) { func doLookup(state *MountState, req *request) {
...@@ -260,7 +260,13 @@ func doLink(state *MountState, req *request) { ...@@ -260,7 +260,13 @@ func doLink(state *MountState, req *request) {
func doRead(state *MountState, req *request) { func doRead(state *MountState, req *request) {
in := (*ReadIn)(req.inData) in := (*ReadIn)(req.inData)
buf := state.AllocOut(req, in.Size) buf := state.AllocOut(req, in.Size)
req.flatData, req.status = state.fileSystem.Read(req.inHeader, in, buf) res := state.fileSystem.Read(req.inHeader, in, buf)
if res.Ok() {
res.Read(buf)
}
req.flatData = res
req.status = res.Status
} }
func doFlush(state *MountState, req *request) { func doFlush(state *MountState, req *request) {
......
package fuse
import (
"io"
"syscall"
)
type ReadResult struct {
Status
Data []byte
// If Data is nil and Status OK, splice from the following
// file.
Fd uintptr
FdOff int64
FdSize int
}
func (r *ReadResult) Clear() {
*r = ReadResult{}
}
func (r *ReadResult) Size() int {
if r.Data != nil {
return len(r.Data)
}
return r.FdSize
}
func (r *ReadResult) Read(buf []byte) Status {
if r.Data != nil || !r.Ok() {
return r.Status
}
if len(buf) < r.FdSize {
r.Status = ERANGE
return ERANGE
}
n, err := syscall.Pread(int(r.Fd), buf[:r.FdSize], r.FdOff)
if err == io.EOF {
err = nil
}
r.Status = ToStatus(err)
if r.Ok() {
r.Data = buf[:n]
}
r.Fd = 0
r.FdOff = 0
r.FdSize = 0
return r.Status
}
...@@ -26,8 +26,8 @@ type request struct { ...@@ -26,8 +26,8 @@ type request struct {
// Unstructured data, a pointer to the relevant XxxxOut struct. // Unstructured data, a pointer to the relevant XxxxOut struct.
outData unsafe.Pointer outData unsafe.Pointer
status Status status Status
flatData []byte flatData ReadResult
// Start timestamp for timing info. // Start timestamp for timing info.
startNs int64 startNs int64
preWriteNs int64 preWriteNs int64
...@@ -58,7 +58,7 @@ func (r *request) clear() { ...@@ -58,7 +58,7 @@ func (r *request) clear() {
r.filenames = nil r.filenames = nil
r.outData = nil r.outData = nil
r.status = OK r.status = OK
r.flatData = nil r.flatData.Clear()
r.preWriteNs = 0 r.preWriteNs = 0
r.startNs = 0 r.startNs = 0
r.handler = nil r.handler = nil
...@@ -100,12 +100,12 @@ func (r *request) OutputDebug() string { ...@@ -100,12 +100,12 @@ func (r *request) OutputDebug() string {
} }
flatStr := "" flatStr := ""
if len(r.flatData) > 0 { if r.flatData.Size() > 0 {
if r.handler.FileNameOut { if r.handler.FileNameOut {
s := strings.TrimRight(string(r.flatData), "\x00") s := strings.TrimRight(string(r.flatData.Data), "\x00")
flatStr = fmt.Sprintf(" %q", s) flatStr = fmt.Sprintf(" %q", s)
} else { } else {
flatStr = fmt.Sprintf(" %d bytes data\n", len(r.flatData)) flatStr = fmt.Sprintf(" %d bytes data\n", r.flatData.Size())
} }
} }
...@@ -175,7 +175,7 @@ func (r *request) parse() { ...@@ -175,7 +175,7 @@ func (r *request) parse() {
r.outData = unsafe.Pointer(&r.outBuf[sizeOfOutHeader]) r.outData = unsafe.Pointer(&r.outBuf[sizeOfOutHeader])
} }
func (r *request) serialize() (header []byte, data []byte) { func (r *request) serializeHeader() (header []byte) {
dataLength := r.handler.OutputSize dataLength := r.handler.OutputSize
if r.outData == nil || r.status > OK { if r.outData == nil || r.status > OK {
dataLength = 0 dataLength = 0
...@@ -187,11 +187,11 @@ func (r *request) serialize() (header []byte, data []byte) { ...@@ -187,11 +187,11 @@ func (r *request) serialize() (header []byte, data []byte) {
o.Unique = r.inHeader.Unique o.Unique = r.inHeader.Unique
o.Status = int32(-r.status) o.Status = int32(-r.status)
o.Length = uint32( o.Length = uint32(
int(sizeOfOutHeader) + int(dataLength) + int(len(r.flatData))) int(sizeOfOutHeader) + int(dataLength) + r.flatData.Size())
var asSlice []byte var asSlice []byte
toSlice(&asSlice, r.outData, dataLength) toSlice(&asSlice, r.outData, dataLength)
copy(header[sizeOfOutHeader:], asSlice) copy(header[sizeOfOutHeader:], asSlice)
return header, r.flatData return header
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment