Commit 1070358b authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

nodefs: defer file closing until all goroutines have stopped using it.

Use a []uint32 in Inode to keep track of open files. This reduces the
memory consumption of this feature.

Add a test that tries to (unsuccessfully) trigger file closing on the
while file.GetAttr is under way
parent 995b283b
...@@ -117,7 +117,7 @@ type Operations interface { ...@@ -117,7 +117,7 @@ type Operations interface {
ListXAttr(ctx context.Context, dest []byte) (uint32, fuse.Status) ListXAttr(ctx context.Context, dest []byte) (uint32, fuse.Status)
// The methods below may be called on closed files, due to // The methods below may be called on closed files, due to
// concurrency. In that case, you should return EBADF. // concurrency.
GetAttr(ctx context.Context, f FileHandle, out *fuse.AttrOut) fuse.Status GetAttr(ctx context.Context, f FileHandle, out *fuse.AttrOut) fuse.Status
// Lookup should find a direct child of the node by child name. // Lookup should find a direct child of the node by child name.
...@@ -208,8 +208,7 @@ type FileHandle interface { ...@@ -208,8 +208,7 @@ type FileHandle interface {
Release() Release()
// The methods below may be called on closed files, due to // The methods below may be called on closed files, due to
// concurrency. In that case, you should return EBADF. // concurrency.
// TODO - fold into a setattr method?
GetAttr(ctx context.Context, out *fuse.AttrOut) fuse.Status GetAttr(ctx context.Context, out *fuse.AttrOut) fuse.Status
Truncate(ctx context.Context, size uint64) fuse.Status Truncate(ctx context.Context, size uint64) fuse.Status
Chown(ctx context.Context, uid uint32, gid uint32) fuse.Status Chown(ctx context.Context, uid uint32, gid uint32) fuse.Status
......
...@@ -17,10 +17,15 @@ import ( ...@@ -17,10 +17,15 @@ import (
type fileEntry struct { type fileEntry struct {
file FileHandle file FileHandle
// index into Inode.openFiles
nodeIndex int
// Directory // Directory
dirStream DirStream dirStream DirStream
hasOverflow bool hasOverflow bool
overflow fuse.DirEntry overflow fuse.DirEntry
wg sync.WaitGroup
} }
type rawBridge struct { type rawBridge struct {
...@@ -35,7 +40,7 @@ type rawBridge struct { ...@@ -35,7 +40,7 @@ type rawBridge struct {
automaticIno uint64 automaticIno uint64
files []*fileEntry files []*fileEntry
freeFiles []uint64 freeFiles []uint32
} }
// newInode creates creates new inode pointing to node. // newInode creates creates new inode pointing to node.
...@@ -75,7 +80,6 @@ func (b *rawBridge) newInode(node Operations, mode uint32, id FileID, persistent ...@@ -75,7 +80,6 @@ func (b *rawBridge) newInode(node Operations, mode uint32, id FileID, persistent
bridge: b, bridge: b,
persistent: persistent, persistent: persistent,
parents: make(map[parentData]struct{}), parents: make(map[parentData]struct{}),
openFiles: make(map[FileHandle]uint32),
} }
if mode == fuse.S_IFDIR { if mode == fuse.S_IFDIR {
inode.children = make(map[string]*Inode) inode.children = make(map[string]*Inode)
...@@ -200,14 +204,14 @@ func (b *rawBridge) Mknod(cancel <-chan struct{}, input *fuse.MknodIn, name stri ...@@ -200,14 +204,14 @@ func (b *rawBridge) Mknod(cancel <-chan struct{}, input *fuse.MknodIn, name stri
} }
// addNewChild inserts the child into the tree. Returns file handle if file != nil. // addNewChild inserts the child into the tree. Returns file handle if file != nil.
func (b *rawBridge) addNewChild(parent *Inode, name string, child *Inode, file FileHandle, fileFlags uint32, out *fuse.EntryOut) uint64 { func (b *rawBridge) addNewChild(parent *Inode, name string, child *Inode, file FileHandle, fileFlags uint32, out *fuse.EntryOut) uint32 {
lockNodes(parent, child) lockNodes(parent, child)
parent.setEntry(name, child) parent.setEntry(name, child)
b.mu.Lock() b.mu.Lock()
child.lookupCount++ child.lookupCount++
var fh uint64 var fh uint32
if file != nil { if file != nil {
fh = b.registerFile(child, file, fileFlags) fh = b.registerFile(child, file, fileFlags)
} }
...@@ -241,7 +245,7 @@ func (b *rawBridge) Create(cancel <-chan struct{}, input *fuse.CreateIn, name st ...@@ -241,7 +245,7 @@ func (b *rawBridge) Create(cancel <-chan struct{}, input *fuse.CreateIn, name st
return status return status
} }
out.Fh = b.addNewChild(parent, name, child, f, input.Flags|syscall.O_CREAT, &out.EntryOut) out.Fh = uint64(b.addNewChild(parent, name, child, f, input.Flags|syscall.O_CREAT, &out.EntryOut))
b.setEntryOutTimeout(&out.EntryOut) b.setEntryOutTimeout(&out.EntryOut)
out.OpenFlags = flags out.OpenFlags = flags
...@@ -270,15 +274,15 @@ func (b *rawBridge) SetDebug(debug bool) {} ...@@ -270,15 +274,15 @@ func (b *rawBridge) SetDebug(debug bool) {}
func (b *rawBridge) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse.AttrOut) fuse.Status { func (b *rawBridge) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse.AttrOut) fuse.Status {
n, fEntry := b.inode(input.NodeId, input.Fh()) n, fEntry := b.inode(input.NodeId, input.Fh())
f := fEntry.file f := fEntry.file
if input.Flags()&fuse.FUSE_GETATTR_FH == 0 { if input.Flags()&fuse.FUSE_GETATTR_FH == 0 {
// The linux kernel doesnt pass along the file // The linux kernel doesnt pass along the file
// descriptor, so we have to fake it here. // descriptor, so we have to fake it here.
// See https://github.com/libfuse/libfuse/issues/62 // See https://github.com/libfuse/libfuse/issues/62
b.mu.Lock() b.mu.Lock()
// TODO: synchronize to avoid closing F while GetAttr runs. for _, fh := range n.openFiles {
for openF := range n.openFiles { f = b.files[fh].file
f = openF b.files[fh].wg.Add(1)
defer b.files[fh].wg.Done()
break break
} }
b.mu.Unlock() b.mu.Unlock()
...@@ -456,26 +460,28 @@ func (b *rawBridge) Open(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.O ...@@ -456,26 +460,28 @@ func (b *rawBridge) Open(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.O
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
out.Fh = b.registerFile(n, f, input.Flags) out.Fh = uint64(b.registerFile(n, f, input.Flags))
out.OpenFlags = flags out.OpenFlags = flags
return fuse.OK return fuse.OK
} }
// registerFile hands out a file handle. Must have bridge.mu // registerFile hands out a file handle. Must have bridge.mu
func (b *rawBridge) registerFile(n *Inode, f FileHandle, flags uint32) uint64 { func (b *rawBridge) registerFile(n *Inode, f FileHandle, flags uint32) uint32 {
var fh uint64 var fh uint32
if len(b.freeFiles) > 0 { if len(b.freeFiles) > 0 {
last := uint64(len(b.freeFiles) - 1) last := len(b.freeFiles) - 1
fh = b.freeFiles[last] fh = b.freeFiles[last]
b.freeFiles = b.freeFiles[:last] b.freeFiles = b.freeFiles[:last]
} else { } else {
fh = uint64(len(b.files)) fh = uint32(len(b.files))
b.files = append(b.files, &fileEntry{}) b.files = append(b.files, &fileEntry{})
} }
if f != nil {
n.openFiles[f] = flags fileEntry := b.files[fh]
b.files[fh].file = f fileEntry.nodeIndex = len(n.openFiles)
} fileEntry.file = f
n.openFiles = append(n.openFiles, fh)
return fh return fh
} }
...@@ -500,34 +506,44 @@ func (b *rawBridge) SetLkw(cancel <-chan struct{}, input *fuse.LkIn) (status fus ...@@ -500,34 +506,44 @@ func (b *rawBridge) SetLkw(cancel <-chan struct{}, input *fuse.LkIn) (status fus
} }
func (b *rawBridge) Release(input *fuse.ReleaseIn) { func (b *rawBridge) Release(input *fuse.ReleaseIn) {
n, f := b.inode(input.NodeId, input.Fh) n, f := b.releaseFileEntry(input.NodeId, input.Fh)
f.wg.Wait()
n.node.Release(f.file) n.node.Release(f.file)
b.releaseFileEntry(n, input.Fh) b.mu.Lock()
defer b.mu.Unlock()
b.freeFiles = append(b.freeFiles, uint32(input.Fh))
} }
func (b *rawBridge) ReleaseDir(input *fuse.ReleaseIn) { func (b *rawBridge) ReleaseDir(input *fuse.ReleaseIn) {
n, f := b.inode(input.NodeId, input.Fh) _, f := b.releaseFileEntry(input.NodeId, input.Fh)
f.wg.Wait()
if f.dirStream != nil { if f.dirStream != nil {
f.dirStream.Close() f.dirStream.Close()
} }
b.releaseFileEntry(n, input.Fh)
b.mu.Lock()
defer b.mu.Unlock()
b.freeFiles = append(b.freeFiles, uint32(input.Fh))
} }
func (b *rawBridge) releaseFileEntry(n *Inode, fh uint64) { func (b *rawBridge) releaseFileEntry(nid uint64, fh uint64) (*Inode, *fileEntry) {
b.mu.Lock()
defer b.mu.Unlock()
n := b.nodes[nid]
var entry *fileEntry
if fh > 0 { if fh > 0 {
b.mu.Lock() last := len(n.openFiles) - 1
defer b.mu.Unlock() entry = b.files[fh]
if last != entry.nodeIndex {
n.openFiles[entry.nodeIndex] = n.openFiles[last]
entry := b.files[fh] b.files[n.openFiles[entry.nodeIndex]].nodeIndex = entry.nodeIndex
if entry.file != nil {
delete(n.openFiles, entry.file)
} }
entry.dirStream = nil n.openFiles = n.openFiles[:last]
entry.file = nil
b.freeFiles = append(b.freeFiles, fh)
} }
return n, entry
} }
func (b *rawBridge) Write(cancel <-chan struct{}, input *fuse.WriteIn, data []byte) (written uint32, status fuse.Status) { func (b *rawBridge) Write(cancel <-chan struct{}, input *fuse.WriteIn, data []byte) (written uint32, status fuse.Status) {
...@@ -558,7 +574,7 @@ func (b *rawBridge) OpenDir(cancel <-chan struct{}, input *fuse.OpenIn, out *fus ...@@ -558,7 +574,7 @@ func (b *rawBridge) OpenDir(cancel <-chan struct{}, input *fuse.OpenIn, out *fus
} }
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
out.Fh = b.registerFile(n, nil, 0) out.Fh = uint64(b.registerFile(n, nil, 0))
return fuse.OK return fuse.OK
} }
......
...@@ -61,9 +61,8 @@ type Inode struct { ...@@ -61,9 +61,8 @@ type Inode struct {
// protected by bridge.mu // protected by bridge.mu
// TODO: store using an array and a per-inode handle; the map // file handles.
// is fairly heavyweight. openFiles []uint32
openFiles map[FileHandle]uint32
// mu protects the following mutable fields. When locking // mu protects the following mutable fields. When locking
// multiple Inodes, locks must be acquired using // multiple Inodes, locks must be acquired using
......
...@@ -54,7 +54,7 @@ func (tc *testCase) Clean() { ...@@ -54,7 +54,7 @@ func (tc *testCase) Clean() {
} }
} }
func newTestCase(t *testing.T) *testCase { func newTestCase(t *testing.T, entryCache bool, attrCache bool) *testCase {
tc := &testCase{ tc := &testCase{
dir: testutil.TempDir(), dir: testutil.TempDir(),
T: t, T: t,
...@@ -70,13 +70,21 @@ func newTestCase(t *testing.T) *testCase { ...@@ -70,13 +70,21 @@ func newTestCase(t *testing.T) *testCase {
tc.loopback = NewLoopback(tc.origDir) tc.loopback = NewLoopback(tc.origDir)
_ = time.Second _ = time.Second
oneSec := time.Second oneSec := time.Second
tc.rawFS = NewNodeFS(tc.loopback, &Options{
Debug: testutil.VerboseTest(),
// NOSUBMIT - should run all tests without cache too attrDT := &oneSec
EntryTimeout: &oneSec, if !attrCache {
AttrTimeout: &oneSec, attrDT = nil
}
entryDT := &oneSec
if !entryCache {
entryDT = nil
}
tc.rawFS = NewNodeFS(tc.loopback, &Options{
Debug: testutil.VerboseTest(),
EntryTimeout: entryDT,
AttrTimeout: attrDT,
}) })
var err error var err error
...@@ -96,7 +104,7 @@ func newTestCase(t *testing.T) *testCase { ...@@ -96,7 +104,7 @@ func newTestCase(t *testing.T) *testCase {
} }
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
tc.writeOrig("file", "hello", 0644) tc.writeOrig("file", "hello", 0644)
...@@ -126,7 +134,7 @@ func TestBasic(t *testing.T) { ...@@ -126,7 +134,7 @@ func TestBasic(t *testing.T) {
} }
func TestFile(t *testing.T) { func TestFile(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
content := []byte("hello world") content := []byte("hello world")
...@@ -168,7 +176,7 @@ func TestFile(t *testing.T) { ...@@ -168,7 +176,7 @@ func TestFile(t *testing.T) {
} }
func TestFileTruncate(t *testing.T) { func TestFileTruncate(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
content := []byte("hello world") content := []byte("hello world")
...@@ -198,7 +206,7 @@ func TestFileTruncate(t *testing.T) { ...@@ -198,7 +206,7 @@ func TestFileTruncate(t *testing.T) {
} }
func TestFileFdLeak(t *testing.T) { func TestFileFdLeak(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer func() { defer func() {
if tc != nil { if tc != nil {
tc.Clean() tc.Clean()
...@@ -234,7 +242,7 @@ func TestFileFdLeak(t *testing.T) { ...@@ -234,7 +242,7 @@ func TestFileFdLeak(t *testing.T) {
} }
func TestMkdir(t *testing.T) { func TestMkdir(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
if err := os.Mkdir(tc.mntDir+"/dir", 0755); err != nil { if err := os.Mkdir(tc.mntDir+"/dir", 0755); err != nil {
...@@ -253,7 +261,7 @@ func TestMkdir(t *testing.T) { ...@@ -253,7 +261,7 @@ func TestMkdir(t *testing.T) {
} }
func testRenameOverwrite(t *testing.T, destExists bool) { func testRenameOverwrite(t *testing.T, destExists bool) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
if err := os.Mkdir(tc.origDir+"/dir", 0755); err != nil { if err := os.Mkdir(tc.origDir+"/dir", 0755); err != nil {
...@@ -296,7 +304,7 @@ func TestRenameDestNoExist(t *testing.T) { ...@@ -296,7 +304,7 @@ func TestRenameDestNoExist(t *testing.T) {
} }
func TestRenameNoOverwrite(t *testing.T) { func TestRenameNoOverwrite(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
if err := os.Mkdir(tc.origDir+"/dir", 0755); err != nil { if err := os.Mkdir(tc.origDir+"/dir", 0755); err != nil {
...@@ -324,7 +332,7 @@ func TestRenameNoOverwrite(t *testing.T) { ...@@ -324,7 +332,7 @@ func TestRenameNoOverwrite(t *testing.T) {
} }
func TestRenameExchange(t *testing.T) { func TestRenameExchange(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
if err := os.Mkdir(tc.origDir+"/dir", 0755); err != nil { if err := os.Mkdir(tc.origDir+"/dir", 0755); err != nil {
...@@ -382,7 +390,7 @@ func TestRenameExchange(t *testing.T) { ...@@ -382,7 +390,7 @@ func TestRenameExchange(t *testing.T) {
func TestNlinkZero(t *testing.T) { func TestNlinkZero(t *testing.T) {
// xfstest generic/035. // xfstest generic/035.
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
src := tc.mntDir + "/src" src := tc.mntDir + "/src"
...@@ -420,7 +428,7 @@ func TestNlinkZero(t *testing.T) { ...@@ -420,7 +428,7 @@ func TestNlinkZero(t *testing.T) {
} }
func TestParallelFileOpen(t *testing.T) { func TestParallelFileOpen(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
fn := tc.mntDir + "/file" fn := tc.mntDir + "/file"
...@@ -449,7 +457,7 @@ func TestParallelFileOpen(t *testing.T) { ...@@ -449,7 +457,7 @@ func TestParallelFileOpen(t *testing.T) {
} }
func TestSymlink(t *testing.T) { func TestSymlink(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
fn := tc.mntDir + "/link" fn := tc.mntDir + "/link"
...@@ -466,7 +474,7 @@ func TestSymlink(t *testing.T) { ...@@ -466,7 +474,7 @@ func TestSymlink(t *testing.T) {
} }
func TestLink(t *testing.T) { func TestLink(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
link := tc.mntDir + "/link" link := tc.mntDir + "/link"
...@@ -496,7 +504,7 @@ func TestLink(t *testing.T) { ...@@ -496,7 +504,7 @@ func TestLink(t *testing.T) {
} }
func TestNotifyEntry(t *testing.T) { func TestNotifyEntry(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
orig := tc.origDir + "/file" orig := tc.origDir + "/file"
...@@ -532,7 +540,7 @@ func TestNotifyEntry(t *testing.T) { ...@@ -532,7 +540,7 @@ func TestNotifyEntry(t *testing.T) {
// XXX Test NotifyDelete? // XXX Test NotifyDelete?
func TestReadDir(t *testing.T) { func TestReadDir(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
f, err := os.Open(tc.mntDir) f, err := os.Open(tc.mntDir)
...@@ -574,7 +582,7 @@ func TestReadDir(t *testing.T) { ...@@ -574,7 +582,7 @@ func TestReadDir(t *testing.T) {
// This test is racy. If an external process consumes space while this // This test is racy. If an external process consumes space while this
// runs, we may see spurious differences between the two statfs() calls. // runs, we may see spurious differences between the two statfs() calls.
func TestStatFs(t *testing.T) { func TestStatFs(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
empty := syscall.Statfs_t{} empty := syscall.Statfs_t{}
...@@ -598,7 +606,7 @@ func TestStatFs(t *testing.T) { ...@@ -598,7 +606,7 @@ func TestStatFs(t *testing.T) {
} }
func TestXAttr(t *testing.T) { func TestXAttr(t *testing.T) {
tc := newTestCase(t) tc := newTestCase(t, true, true)
defer tc.Clean() defer tc.Clean()
tc.writeOrig("file", "", 0644) tc.writeOrig("file", "", 0644)
...@@ -631,3 +639,48 @@ func TestXAttr(t *testing.T) { ...@@ -631,3 +639,48 @@ func TestXAttr(t *testing.T) {
t.Fatalf("got %v want ENOATTR", err) t.Fatalf("got %v want ENOATTR", err)
} }
} }
func TestGetAttrParallel(t *testing.T) {
// We grab a file-handle to provide to the API so rename+fstat
// can be handled correctly. Here, test that closing and
// (f)stat in parallel don't lead to fstat on closed files.
// We can only test that if we switch off caching
tc := newTestCase(t, false, false)
defer tc.Clean()
N := 100
var fds []int
var fns []string
for i := 0; i < N; i++ {
fn := fmt.Sprintf("file%d", i)
tc.writeOrig(fn, "ello", 0644)
fn = filepath.Join(tc.mntDir, fn)
fns = append(fns, fn)
fd, err := syscall.Open(fn, syscall.O_RDONLY, 0)
if err != nil {
t.Fatalf("Open %d: %v", i, err)
}
fds = append(fds, fd)
}
var wg sync.WaitGroup
wg.Add(2 * N)
for i := 0; i < N; i++ {
go func(i int) {
if err := syscall.Close(fds[i]); err != nil {
t.Errorf("close %d: %v", i, err)
}
wg.Done()
}(i)
go func(i int) {
var st syscall.Stat_t
if err := syscall.Lstat(fns[i], &st); err != nil {
t.Errorf("lstat %d: %v", i, err)
}
wg.Done()
}(i)
}
wg.Wait()
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment