Commit f8840792 authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

nodefs: support cancellation

parent c22e8d7b
...@@ -14,12 +14,13 @@ Decisions ...@@ -14,12 +14,13 @@ Decisions
* Nodes can be "persistent", meaning their lifetime is not under * Nodes can be "persistent", meaning their lifetime is not under
control of the kernel. This is useful for constructing FS trees control of the kernel. This is useful for constructing FS trees
in advance, rather than driven by LOOKUP.. in advance, rather than driven by LOOKUP.
* The NodeID for FS tree node must be defined on creation and are * The NodeID for FS tree node must be defined on creation and are
immutable. By contrast, reusing NodeIds (eg. rsc/bazil FUSE, as immutable. By contrast, reusing NodeIds (eg. rsc/bazil FUSE, as
well as old go-fuse/fuse/nodefs) is racy when notify and FORGET well as old go-fuse/fuse/nodefs) needs extra synchronization to
operations race. avoid races with notify and FORGET, and makes handling the inode
Generation more complicated.
* The mode of an Inode is defined on creation. Files cannot change * The mode of an Inode is defined on creation. Files cannot change
type during their lifetime. This also prevents the common error type during their lifetime. This also prevents the common error
...@@ -32,7 +33,7 @@ Decisions ...@@ -32,7 +33,7 @@ Decisions
* Support for hard links. libfuse doesn't support this in the * Support for hard links. libfuse doesn't support this in the
high-level API. Extra care for race conditions is needed when high-level API. Extra care for race conditions is needed when
looking up the same file different paths. looking up the same file through different paths.
* do not issue Notify{Entry,Delete} as part of * do not issue Notify{Entry,Delete} as part of
AddChild/RmChild/MvChild: because NodeIDs are unique and AddChild/RmChild/MvChild: because NodeIDs are unique and
...@@ -42,41 +43,41 @@ Decisions ...@@ -42,41 +43,41 @@ Decisions
* Directory reading uses the DirStream. Semantics for rewinding * Directory reading uses the DirStream. Semantics for rewinding
directory reads, and adding files after opening (but before directory reads, and adding files after opening (but before
reading) are handled automatically. reading) are handled automatically. No support for directory
seeks.
To decide To decide
========= =========
* Should we provide automatic fileID numbering? * Should we provide automatic fileID numbering?
* One giant interface with many methods, or many one-method interfaces? * One giant interface with many methods, or many one-method
interfaces? Or some interface (file, dir, symlink, etc).
* one SetAttr method, or many (Chown, Truncate, etc.) * one SetAttr method, or many (Chown, Truncate, etc.)
* function signatures, or types? The latter is easier to remember? * function signatures, or types? The latter is easier to remember?
Easier to extend? Easier to extend? The latter less efficient (indirections/copies)
``` ```
func Lookup(name string, out *EntryOut) (Node, Status) { func Lookup(name string, out *EntryOut) (Node, Status) {
} }
or
type LookupOp struct { type LookupIn {
// in Name string
Name string }
type LookupOut {
// out fuse.EntryOut
Child Node
Out *EntryOut
} }
func Lookup(op LookupOp)
func Lookup(ctx context.Context, in *LookupIn, out *LookupOut)
``` ```
* What to do with semi-unused fields (CreateIn.Umask, OpenIn.Mode, etc.) * What to do with semi-unused fields (CreateIn.Umask, OpenIn.Mode, etc.)
* cancellation through context.Context (standard, more GC overhead)
or a custom context (could reuse across requests.)?
* Readlink return: []byte or string ? * Readlink return: []byte or string ?
* Should Operations.Lookup return *Inode or Operations ? * Should Operations.Lookup return *Inode or Operations ?
......
...@@ -113,8 +113,8 @@ type Operations interface { ...@@ -113,8 +113,8 @@ type Operations interface {
// Lookup should find a direct child of the node by child name. // Lookup should find a direct child of the node by child name.
// //
// VFS makes sure to call Lookup only once for particular (node, name) // VFS makes sure to call Lookup only once for particular
// pair. // (node, name) pair concurrently.
Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*Inode, fuse.Status) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*Inode, fuse.Status)
Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (*Inode, fuse.Status) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (*Inode, fuse.Status)
...@@ -135,8 +135,13 @@ type Operations interface { ...@@ -135,8 +135,13 @@ type Operations interface {
// ReadDir opens a stream of directory entries. // ReadDir opens a stream of directory entries.
ReadDir(ctx context.Context) (DirStream, fuse.Status) ReadDir(ctx context.Context) (DirStream, fuse.Status)
// Reads data from a file. The data should be returned as
// ReadResult, which may be constructed from the incoming
// `dest` buffer.
Read(ctx context.Context, f FileHandle, dest []byte, off int64) (fuse.ReadResult, fuse.Status) Read(ctx context.Context, f FileHandle, dest []byte, off int64) (fuse.ReadResult, fuse.Status)
// Writes the data into the file handle at given offset. After
// returning, the data will be reused and may not referenced.
Write(ctx context.Context, f FileHandle, data []byte, off int64) (written uint32, status fuse.Status) Write(ctx context.Context, f FileHandle, data []byte, off int64) (written uint32, status fuse.Status)
Fsync(ctx context.Context, f FileHandle, flags uint32) (status fuse.Status) Fsync(ctx context.Context, f FileHandle, flags uint32) (status fuse.Status)
...@@ -147,10 +152,11 @@ type Operations interface { ...@@ -147,10 +152,11 @@ type Operations interface {
Flush(ctx context.Context, f FileHandle) fuse.Status Flush(ctx context.Context, f FileHandle) fuse.Status
// This is called to before the file handle is forgotten. This // This is called to before the file handle is forgotten. This
// method has no return value, so nothing can synchronizes on // method has no return value, so nothing can synchronize on
// the call. Any cleanup that requires specific synchronization or // the call, and it cannot be canceled. Any cleanup that
// could fail with I/O errors should happen in Flush instead. // requires specific synchronization or could fail with I/O
Release(ctx context.Context, f FileHandle) // errors should happen in Flush instead.
Release(f FileHandle)
/* /*
NOSUBMIT - fold into a setattr method, or expand methods? NOSUBMIT - fold into a setattr method, or expand methods?
...@@ -168,6 +174,9 @@ type Operations interface { ...@@ -168,6 +174,9 @@ type Operations interface {
type FileHandle interface { type FileHandle interface {
Read(ctx context.Context, dest []byte, off int64) (fuse.ReadResult, fuse.Status) Read(ctx context.Context, dest []byte, off int64) (fuse.ReadResult, fuse.Status)
// Writes the data at given offset. After returning, the data
// will be reused and may not referenced.
Write(ctx context.Context, data []byte, off int64) (written uint32, status fuse.Status) Write(ctx context.Context, data []byte, off int64) (written uint32, status fuse.Status)
// File locking // File locking
...@@ -184,9 +193,10 @@ type FileHandle interface { ...@@ -184,9 +193,10 @@ type FileHandle interface {
// This is called to before the file handle is forgotten. This // This is called to before the file handle is forgotten. This
// method has no return value, so nothing can synchronizes on // method has no return value, so nothing can synchronizes on
// the call. Any cleanup that requires specific synchronization or // the call, and it cannot be canceled. Any cleanup that
// could fail with I/O errors should happen in Flush instead. // requires specific synchronization or could fail with I/O
Release(ctx context.Context) // errors should happen in Flush instead.
Release()
// The methods below may be called on closed files, due to // The methods below may be called on closed files, due to
// concurrency. In that case, you should return EBADF. // concurrency. In that case, you should return EBADF.
......
This diff is collapsed.
...@@ -150,9 +150,9 @@ func (n *DefaultOperations) Flush(ctx context.Context, f FileHandle) fuse.Status ...@@ -150,9 +150,9 @@ func (n *DefaultOperations) Flush(ctx context.Context, f FileHandle) fuse.Status
return fuse.ENOSYS return fuse.ENOSYS
} }
func (n *DefaultOperations) Release(ctx context.Context, f FileHandle) { func (n *DefaultOperations) Release(f FileHandle) {
if f != nil { if f != nil {
f.Release(ctx) f.Release()
} }
} }
...@@ -244,7 +244,7 @@ func (f *DefaultFile) Flush(ctx context.Context) fuse.Status { ...@@ -244,7 +244,7 @@ func (f *DefaultFile) Flush(ctx context.Context) fuse.Status {
return fuse.ENOSYS return fuse.ENOSYS
} }
func (f *DefaultFile) Release(ctx context.Context) { func (f *DefaultFile) Release() {
} }
......
...@@ -46,7 +46,7 @@ func (f *loopbackFile) Write(ctx context.Context, data []byte, off int64) (uint3 ...@@ -46,7 +46,7 @@ func (f *loopbackFile) Write(ctx context.Context, data []byte, off int64) (uint3
return uint32(n), fuse.ToStatus(err) return uint32(n), fuse.ToStatus(err)
} }
func (f *loopbackFile) Release(ctx context.Context) { func (f *loopbackFile) Release() {
f.mu.Lock() f.mu.Lock()
f.File.Close() f.File.Close()
f.mu.Unlock() f.mu.Unlock()
......
...@@ -5,9 +5,7 @@ ...@@ -5,9 +5,7 @@
package nodefs package nodefs
import ( import (
"bytes"
"context" "context"
"log"
"os" "os"
"os/exec" "os/exec"
"testing" "testing"
...@@ -19,56 +17,33 @@ import ( ...@@ -19,56 +17,33 @@ import (
type interruptRoot struct { type interruptRoot struct {
DefaultOperations DefaultOperations
child interruptOps
} }
type interruptOps struct { type interruptOps struct {
DefaultOperations DefaultOperations
Data []byte interrupted bool
} }
func (r *interruptRoot) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*Inode, fuse.Status) { func (r *interruptRoot) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*Inode, fuse.Status) {
if name != "file" { if name != "file" {
return nil, fuse.ENOENT return nil, fuse.ENOENT
} }
ch := InodeOf(r).NewInode(&interruptOps{ ch := InodeOf(r).NewInode(&r.child, fuse.S_IFREG, FileID{
DefaultOperations{},
bytes.Repeat([]byte{42}, 1024),
}, fuse.S_IFREG, FileID{
Ino: 2, Ino: 2,
Gen: 1}) Gen: 1})
out.Size = 1024
out.Mode = fuse.S_IFREG | 0644
return ch, fuse.OK return ch, fuse.OK
} }
func (o *interruptOps) GetAttr(ctx context.Context, f FileHandle, out *fuse.AttrOut) fuse.Status {
out.Mode = fuse.S_IFREG | 0644
out.Size = uint64(len(o.Data))
return fuse.OK
}
type interruptFile struct {
DefaultFile
}
func (f *interruptFile) Flush(ctx context.Context) fuse.Status {
return fuse.OK
}
func (o *interruptOps) Open(ctx context.Context, flags uint32) (FileHandle, uint32, fuse.Status) { func (o *interruptOps) Open(ctx context.Context, flags uint32) (FileHandle, uint32, fuse.Status) {
return &interruptFile{}, 0, fuse.OK select {
} case <-time.After(100 * time.Millisecond):
return nil, 0, fuse.EIO
func (o *interruptOps) Read(ctx context.Context, f FileHandle, dest []byte, off int64) (fuse.ReadResult, fuse.Status) { case <-ctx.Done():
time.Sleep(100 * time.Millisecond) o.interrupted = true
end := int(off) + len(dest) return nil, 0, fuse.EINTR
if end > len(o.Data) {
end = len(o.Data)
} }
return fuse.ReadResultData(o.Data[off:end]), fuse.OK
} }
// This currently doesn't test functionality, but is useful to investigate how // This currently doesn't test functionality, but is useful to investigate how
...@@ -76,11 +51,11 @@ func (o *interruptOps) Read(ctx context.Context, f FileHandle, dest []byte, off ...@@ -76,11 +51,11 @@ func (o *interruptOps) Read(ctx context.Context, f FileHandle, dest []byte, off
func TestInterrupt(t *testing.T) { func TestInterrupt(t *testing.T) {
mntDir := testutil.TempDir() mntDir := testutil.TempDir()
defer os.Remove(mntDir) defer os.Remove(mntDir)
loopback := &interruptRoot{DefaultOperations{}} root := &interruptRoot{}
_ = time.Second _ = time.Second
oneSec := time.Second oneSec := time.Second
rawFS := NewNodeFS(loopback, &Options{ rawFS := NewNodeFS(root, &Options{
Debug: testutil.VerboseTest(), Debug: testutil.VerboseTest(),
// NOSUBMIT - should run all tests without cache too // NOSUBMIT - should run all tests without cache too
...@@ -108,9 +83,12 @@ func TestInterrupt(t *testing.T) { ...@@ -108,9 +83,12 @@ func TestInterrupt(t *testing.T) {
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
log.Println("killing subprocess")
if err := cmd.Process.Kill(); err != nil { if err := cmd.Process.Kill(); err != nil {
t.Errorf("Kill: %v", err) t.Errorf("Kill: %v", err)
} }
time.Sleep(100 * time.Millisecond)
server.Unmount()
if !root.child.interrupted {
t.Errorf("open request was not interrupted")
}
} }
...@@ -62,13 +62,13 @@ type loopbackNode struct { ...@@ -62,13 +62,13 @@ type loopbackNode struct {
openFiles map[*loopbackFile]uint32 openFiles map[*loopbackFile]uint32
} }
func (n *loopbackNode) Release(ctx context.Context, f FileHandle) { func (n *loopbackNode) Release(f FileHandle) {
if f != nil { if f != nil {
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()
lf := f.(*loopbackFile) lf := f.(*loopbackFile)
delete(n.openFiles, lf) delete(n.openFiles, lf)
f.Release(ctx) f.Release()
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment