Commit 9d9c4ddf authored by Kirill Smelkov's avatar Kirill Smelkov Committed by Han-Wen Nienhuys

Add support to retrieve kernel cache (retrieve notify)

This patch continues bdca0e6a (Add support for store notify) and adds
corresponding support for counterpart to cache-store - to retrieve an inode
data from kernel cache.

As it was already noted in bdca0e6a, FUSE protocol provides primitives for
pagecache control: to invalidate a data region for inode (notify_inval_inode),
to store data into inode kernel's cache (notify_store), and to retrieve data
from inode kernel's cache (notify_retrieve). For the latter 2 FUSE protocol
messages and brief documentation about semantic can be seen here:

https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/fuse.h?id=v4.19-rc6-177-gcec4de302c5f#n68
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/fuse.h?id=v4.19-rc6-177-gcec4de302c5f#n756

https://git.kernel.org/linus/a1d75f2582
https://git.kernel.org/linus/2d45ba381a

In short, to retrieve data from kernel cache, filesystem server sends

	S > C	NOTIFY_RETRIEVE_CACHE{notifyUnique, inode, offset, size}

and if that message was sent correctly, the kernel sends back another
write-style message with unique=notifyUnique

	S < C	NOTIFY_REPLY{inode, offset, size, data}

Since so far there were no cases when a server was querying the kernel,
and the reply comes as separate kernel "request", we have to add
infrastructure for tracking such in-flight queries. This is done by
adding Server.retrieveTab and friends.

Otherwise the implementation is straightforward.

A particular note is that from a user-level API point of view we are not
following e.g. libfuse to register a callback to be invoked upon reply, but
instead provide {Inode,File}RetrieveCache that synchronously send notify query
and wait for kernel's reply.

This fits more naturally to Go and is easier to use.
parent 7954c216
......@@ -407,6 +407,32 @@ func (c *FileSystemConnector) FileNotifyStoreCache(node *Inode, off int64, data
return c.server.InodeNotifyStoreCache(nID, off, data)
}
// FileRetrieveCache retrieves data from kernel's inode cache.
//
// This call retrieves data from kernel's inode cache @ offset and up to
// len(dest) bytes. If kernel cache has fewer consecutive data starting at
// offset, that fewer amount is returned. In particular if inode data at offset
// is not cached (0, OK) is returned.
//
// If the kernel does not currently have entry for this inode in its dentry
// cache (0, OK) is still returned, pretending that the inode could be known to
// the kernel, but kernel's inode cache is empty.
func (c *FileSystemConnector) FileRetrieveCache(node *Inode, off int64, dest []byte) (n int, st fuse.Status) {
var nID uint64
if node == c.rootNode {
nID = fuse.FUSE_ROOT_ID
} else {
nID = c.inodeMap.Handle(&node.handled)
}
if nID == 0 {
// the kernel does not currently know about this inode.
// -> we can pretend that its cache for the inode is empty.
return 0, fuse.OK
}
return c.server.InodeRetrieveCache(nID, off, dest)
}
// EntryNotify makes the kernel forget the entry data from the given
// name from a directory. After this call, the kernel will issue a
// new lookup request for the given name when necessary. No filesystem
......
......@@ -61,9 +61,10 @@ const (
_OP_NOTIFY_INVAL_ENTRY = int32(100)
_OP_NOTIFY_INVAL_INODE = int32(101)
_OP_NOTIFY_STORE_CACHE = int32(102)
_OP_NOTIFY_DELETE = int32(103) // protocol version 18
_OP_NOTIFY_RETRIEVE_CACHE = int32(103)
_OP_NOTIFY_DELETE = int32(104) // protocol version 18
_OPCODE_COUNT = int32(104)
_OPCODE_COUNT = int32(105)
)
////////////////////////////////////////////////////////////////
......@@ -177,6 +178,44 @@ func doWrite(server *Server, req *request) {
req.status = status
}
func doNotifyReply(server *Server, req *request) {
reply := (*NotifyRetrieveIn)(req.inData)
server.retrieveMu.Lock()
reading := server.retrieveTab[reply.Unique]
delete(server.retrieveTab, reply.Unique)
server.retrieveMu.Unlock()
badf := func(format string, argv ...interface{}) {
log.Printf("notify reply: "+format, argv...)
}
if reading == nil {
badf("unexpected unique - ignoring")
return
}
reading.n = 0
reading.st = EIO
defer close(reading.ready)
if reading.nodeid != reply.NodeId {
badf("inode mismatch: expected %s, got %s", reading.nodeid, reply.NodeId)
return
}
if reading.offset != reply.Offset {
badf("offset mismatch: expected @%d, got @%d", reading.offset, reply.Offset)
return
}
if len(reading.dest) < len(req.arg) {
badf("too much data: requested %db, got %db (will use only %db)", len(reading.dest), len(req.arg), len(reading.dest))
}
reading.n = copy(reading.dest, req.arg)
reading.st = OK
}
const _SECURITY_CAPABILITY = "security.capability"
const _SECURITY_ACL = "system.posix_acl_access"
const _SECURITY_ACL_DEFAULT = "system.posix_acl_default"
......@@ -483,6 +522,7 @@ func init() {
_OP_BMAP: unsafe.Sizeof(_BmapIn{}),
_OP_IOCTL: unsafe.Sizeof(_IoctlIn{}),
_OP_POLL: unsafe.Sizeof(_PollIn{}),
_OP_NOTIFY_REPLY: unsafe.Sizeof(NotifyRetrieveIn{}),
_OP_FALLOCATE: unsafe.Sizeof(FallocateIn{}),
_OP_READDIRPLUS: unsafe.Sizeof(ReadIn{}),
} {
......@@ -512,6 +552,7 @@ func init() {
_OP_NOTIFY_INVAL_ENTRY: unsafe.Sizeof(NotifyInvalEntryOut{}),
_OP_NOTIFY_INVAL_INODE: unsafe.Sizeof(NotifyInvalInodeOut{}),
_OP_NOTIFY_STORE_CACHE: unsafe.Sizeof(NotifyStoreOut{}),
_OP_NOTIFY_RETRIEVE_CACHE: unsafe.Sizeof(NotifyRetrieveOut{}),
_OP_NOTIFY_DELETE: unsafe.Sizeof(NotifyInvalDeleteOut{}),
} {
operationHandlers[op].OutputSize = sz
......@@ -557,9 +598,11 @@ func init() {
_OP_DESTROY: "DESTROY",
_OP_IOCTL: "IOCTL",
_OP_POLL: "POLL",
_OP_NOTIFY_REPLY: "NOTIFY_REPLY",
_OP_NOTIFY_INVAL_ENTRY: "NOTIFY_INVAL_ENTRY",
_OP_NOTIFY_INVAL_INODE: "NOTIFY_INVAL_INODE",
_OP_NOTIFY_STORE_CACHE: "NOTIFY_STORE",
_OP_NOTIFY_RETRIEVE_CACHE: "NOTIFY_RETRIEVE",
_OP_NOTIFY_DELETE: "NOTIFY_DELETE",
_OP_FALLOCATE: "FALLOCATE",
_OP_READDIRPLUS: "READDIRPLUS",
......@@ -604,6 +647,7 @@ func init() {
_OP_STATFS: doStatFs,
_OP_IOCTL: doIoctl,
_OP_DESTROY: doDestroy,
_OP_NOTIFY_REPLY: doNotifyReply,
_OP_FALLOCATE: doFallocate,
_OP_READDIRPLUS: doReadDirPlus,
} {
......@@ -624,6 +668,7 @@ func init() {
_OP_NOTIFY_INVAL_ENTRY: func(ptr unsafe.Pointer) interface{} { return (*NotifyInvalEntryOut)(ptr) },
_OP_NOTIFY_INVAL_INODE: func(ptr unsafe.Pointer) interface{} { return (*NotifyInvalInodeOut)(ptr) },
_OP_NOTIFY_STORE_CACHE: func(ptr unsafe.Pointer) interface{} { return (*NotifyStoreOut)(ptr) },
_OP_NOTIFY_RETRIEVE_CACHE: func(ptr unsafe.Pointer) interface{} { return (*NotifyRetrieveOut)(ptr) },
_OP_NOTIFY_DELETE: func(ptr unsafe.Pointer) interface{} { return (*NotifyInvalDeleteOut)(ptr) },
_OP_STATFS: func(ptr unsafe.Pointer) interface{} { return (*StatfsOut)(ptr) },
_OP_SYMLINK: func(ptr unsafe.Pointer) interface{} { return (*EntryOut)(ptr) },
......@@ -655,6 +700,7 @@ func init() {
_OP_RELEASE: func(ptr unsafe.Pointer) interface{} { return (*ReleaseIn)(ptr) },
_OP_RELEASEDIR: func(ptr unsafe.Pointer) interface{} { return (*ReleaseIn)(ptr) },
_OP_FALLOCATE: func(ptr unsafe.Pointer) interface{} { return (*FallocateIn)(ptr) },
_OP_NOTIFY_REPLY: func(ptr unsafe.Pointer) interface{} { return (*NotifyRetrieveIn)(ptr) },
_OP_READDIRPLUS: func(ptr unsafe.Pointer) interface{} { return (*ReadIn)(ptr) },
_OP_RENAME: func(ptr unsafe.Pointer) interface{} { return (*RenameIn)(ptr) },
_OP_GETLK: func(ptr unsafe.Pointer) interface{} { return (*LkIn)(ptr) },
......
......@@ -236,6 +236,14 @@ func (o *NotifyStoreOut) string() string {
return fmt.Sprintf("{nodeid %d off %d sz %d}", o.Nodeid, o.Offset, o.Size)
}
func (o *NotifyRetrieveOut) string() string {
return fmt.Sprintf("{notifyUnique %d nodeid %d off %d sz %d}", o.NotifyUnique, o.Nodeid, o.Offset, o.Size)
}
func (i *NotifyRetrieveIn) string() string {
return fmt.Sprintf("{off %d sz %d}", i.Offset, i.Size)
}
func (f *FallocateIn) string() string {
return fmt.Sprintf("{Fh %d off %d sz %d mod 0%o}",
f.Fh, f.Offset, f.Length, f.Mode)
......
......@@ -48,6 +48,11 @@ type Server struct {
reqReaders int
kernelSettings InitIn
// in-flight notify-retrieve queries
retrieveMu sync.Mutex
retrieveNext uint64
retrieveTab map[uint64]*retrieveCacheRequest // notifyUnique -> retrieve request
singleReader bool
canSplice bool
loops sync.WaitGroup
......@@ -157,6 +162,7 @@ func NewServer(fs RawFileSystem, mountPoint string, opts *MountOptions) (*Server
ms := &Server{
fileSystem: fs,
opts: &o,
retrieveTab: make(map[uint64]*retrieveCacheRequest),
// OSX has races when multiple routines read from the
// FUSE device: on unmount, sometime some reads do not
// error-out, meaning that unmount will hang.
......@@ -328,6 +334,23 @@ func (ms *Server) Serve() {
ms.writeMu.Lock()
syscall.Close(ms.mountFd)
ms.writeMu.Unlock()
// shutdown in-flight cache retrieves.
//
// It is possible that umount comes in the middle - after retrieve
// request was sent to kernel, but corresponding kernel reply has not
// yet been read. We unblock all such readers and wake them up with ENODEV.
ms.retrieveMu.Lock()
rtab := ms.retrieveTab
// retrieve attempts might be erroneously tried even after close
// we have to keep retrieveTab !nil not to panic.
ms.retrieveTab = make(map[uint64]*retrieveCacheRequest)
ms.retrieveMu.Unlock()
for _, reading := range rtab {
reading.n = 0
reading.st = ENODEV
close(reading.ready)
}
}
func (ms *Server) handleInit() Status {
......@@ -427,8 +450,9 @@ func (ms *Server) allocOut(req *request, size uint32) []byte {
}
func (ms *Server) write(req *request) Status {
// Forget does not wait for reply.
if req.inHeader.Opcode == _OP_FORGET || req.inHeader.Opcode == _OP_BATCH_FORGET {
// Forget/NotifyReply do not wait for reply from filesystem server.
switch req.inHeader.Opcode {
case _OP_FORGET, _OP_BATCH_FORGET, _OP_NOTIFY_REPLY:
return OK
}
......@@ -535,6 +559,96 @@ func (ms *Server) inodeNotifyStoreCache32(node uint64, offset int64, data []byte
return result
}
// InodeRetrieveCache retrieves data from kernel's inode cache.
//
// InodeRetrieveCache asks kernel to return data from its cache for inode at
// [offset:offset+len(dest)) and waits for corresponding reply. If kernel cache
// has fewer consecutive data starting at offset, that fewer amount is returned.
// In particular if inode data at offset is not cached (0, OK) is returned.
func (ms *Server) InodeRetrieveCache(node uint64, offset int64, dest []byte) (n int, st Status) {
if !ms.kernelSettings.SupportsNotify(NOTIFY_RETRIEVE_CACHE) {
return 0, ENOSYS
}
req := request{
inHeader: &InHeader{
Opcode: _OP_NOTIFY_RETRIEVE_CACHE,
},
handler: operationHandlers[_OP_NOTIFY_RETRIEVE_CACHE],
status: NOTIFY_RETRIEVE_CACHE,
}
// retrieve up to 2GB not to overflow uint32 size in NotifyRetrieveOut.
// see InodeNotifyStoreCache in similar place for why it is only 2GB, not 4GB.
size := len(dest)
if size > math.MaxInt32 {
size = math.MaxInt32
}
dest = dest[:size]
q := (*NotifyRetrieveOut)(req.outData())
q.Nodeid = node
q.Offset = uint64(offset) // not int64, as it is e.g. in NotifyInvalInodeOut
q.Size = uint32(len(dest))
reading := &retrieveCacheRequest{
nodeid: q.Nodeid,
offset: q.Offset,
dest: dest,
ready: make(chan struct{}),
}
ms.retrieveMu.Lock()
q.NotifyUnique = ms.retrieveNext
ms.retrieveNext++
ms.retrieveTab[q.NotifyUnique] = reading
ms.retrieveMu.Unlock()
// Protect against concurrent close.
ms.writeMu.Lock()
result := ms.write(&req)
ms.writeMu.Unlock()
if ms.opts.Debug {
log.Printf("Response: NOTIFY_RETRIEVE_CACHE: %v", result)
}
if result != OK {
ms.retrieveMu.Lock()
r := ms.retrieveTab[q.NotifyUnique]
if r == reading {
delete(ms.retrieveTab, q.NotifyUnique)
} else if r == nil {
// ok - might be dequeued by umount
} else {
// although very unlikely, it is possible that kernel sends
// unexpected NotifyReply with our notifyUnique, then
// retrieveNext wraps, makes full cycle, and another
// retrieve request is made with the same notifyUnique.
log.Printf("W: INODE_RETRIEVE_CACHE: request with notifyUnique=%d mutated", q.NotifyUnique)
}
ms.retrieveMu.Unlock()
return 0, result
}
// NotifyRetrieveOut sent to the kernel successfully. Now the kernel
// have to return data in a separate write-style NotifyReply request.
// Wait for the result.
<-reading.ready
return reading.n, reading.st
}
// retrieveCacheRequest represents in-flight cache retrieve request.
type retrieveCacheRequest struct {
nodeid uint64
offset uint64
dest []byte
// reply status
n int
st Status
ready chan struct{}
}
// DeleteNotify notifies the kernel that an entry is removed from a
// directory. In many cases, this is equivalent to EntryNotify,
// except when the directory is in use, eg. as working directory of
......@@ -626,7 +740,7 @@ func (in *InitIn) SupportsNotify(notifyType int) bool {
return in.SupportsVersion(7, 12)
case NOTIFY_INVAL_INODE:
return in.SupportsVersion(7, 12)
case NOTIFY_STORE_CACHE:
case NOTIFY_STORE_CACHE, NOTIFY_RETRIEVE_CACHE:
return in.SupportsVersion(7, 15)
case NOTIFY_DELETE:
return in.SupportsVersion(7, 18)
......
......@@ -95,6 +95,42 @@ func TestCacheControl(t *testing.T) {
}
}
// assertCacheRead asserts that file's kernel cache is retrieved as dataOK.
assertCacheRead := func(subj, dataOK string) {
t.Helper()
assertCacheReadAt := func(offset int64, size int, dataOK string) {
t.Helper()
buf := make([]byte, size)
n, st := fsconn.FileRetrieveCache(file.Inode(), offset, buf)
if st != fuse.OK {
t.Fatalf("%s: retrieve cache @%d [%d]: %s", subj, offset, size, st)
}
if got := buf[:n]; string(got) != dataOK {
t.Fatalf("%s: retrieve cache @%d [%d]: have %q; want %q", subj, offset, size, got, dataOK)
}
}
// retrieve [1:len - 1] (also verifying that offset/size are handled correctly)
l := len(dataOK)
if l >= 2 {
assertCacheReadAt(1, l-2, dataOK[1:l-1])
}
// retrieve [:∞]
assertCacheReadAt(0, l+10000, dataOK)
}
// before the kernel has entry for file in its dentry cache, the cache
// should read as empty.
assertCacheRead("before lookup", "")
// lookup on the file - forces to assign inode ID to it
os.Stat(dir + "/hello.txt")
// cache should be initially empty
assertCacheRead("initial", "")
// make sure the file reads correctly
assertFileRead("original", data0)
......@@ -132,6 +168,7 @@ func TestCacheControl(t *testing.T) {
// make sure the cache has original data
assertMmapRead("original", data0)
assertCacheRead("original", data0)
// store changed data into OS cache
st := fsconn.FileNotifyStoreCache(file.Inode(), 7, []byte("123"))
......@@ -143,8 +180,7 @@ func TestCacheControl(t *testing.T) {
data1 := "hello w123d"
assertMmapRead("after storecache", data1)
assertFileRead("after storecache", data1)
// TODO verify retrieve cache
assertCacheRead("after storecache", data1)
// invalidate cache
st = fsconn.FileNotify(file.Inode(), 0, 0)
......@@ -152,7 +188,11 @@ func TestCacheControl(t *testing.T) {
t.Fatalf("invalidate cache: %s", st)
}
// make sure mmapped data and file read as original data
// make sure cache reads as empty right after invalidation
assertCacheRead("after invalcache", "")
// make sure mmapped data, file and cache read as original data
assertMmapRead("after invalcache", data0)
assertFileRead("after invalcache", data0)
assertCacheRead("after invalcache + refill", data0)
}
......@@ -386,12 +386,30 @@ type NotifyStoreOut struct {
Padding uint32
}
type NotifyRetrieveOut struct {
NotifyUnique uint64
Nodeid uint64
Offset uint64
Size uint32
Padding uint32
}
type NotifyRetrieveIn struct {
InHeader
Dummy1 uint64
Offset uint64
Size uint32
Dummy2 uint32
Dummy3 uint64
Dummy4 uint64
}
const (
// NOTIFY_POLL = -1 // notify kernel that a poll waiting for IO on a file handle should wake up
NOTIFY_INVAL_INODE = -2 // notify kernel that an inode should be invalidated
NOTIFY_INVAL_ENTRY = -3 // notify kernel that a directory entry should be invalidated
NOTIFY_STORE_CACHE = -4 // store data into kernel cache of an inode
// NOTIFY_RETRIEVE_CACHE = -5 // retrieve data from kernel cache of an inode
NOTIFY_RETRIEVE_CACHE = -5 // retrieve data from kernel cache of an inode
NOTIFY_DELETE = -6 // notify kernel that a directory entry has been deleted
// NOTIFY_CODE_MAX = -6
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment