Commit 87818b0d authored by Kirill Smelkov's avatar Kirill Smelkov Committed by Levin Zimmermann

wcfs: Cleanup zheadSockTab entry when client drops opened .wcfs/zhead handle

This was marked as TODO in server code and not implemented.
Without this cleanup zheadSockTab was growing indefinitely after every
open/close and leaking memory.

-> Fix it via registering RELEASE handler to FUSE and removing
corresponding zheadSockTab entry from there.

/reviewed-by @levin.zimmermann
/reviewed-on nexedi/wendelin.core!18
parent 8abfd27d
...@@ -321,11 +321,16 @@ func NewFileSock() *FileSock { ...@@ -321,11 +321,16 @@ func NewFileSock() *FileSock {
// The handle should be given to kernel as result of a file open, for that file // The handle should be given to kernel as result of a file open, for that file
// to be connected to the socket. // to be connected to the socket.
func (sk *FileSock) File() nodefs.File { func (sk *FileSock) File() nodefs.File {
return WithOpenStreamFlags(sk.file)
}
// WithOpenStreamFlags wraps file handle with FUSE flags needed when opening stream IO.
func WithOpenStreamFlags(file nodefs.File) nodefs.File {
// nonseekable & directio for opened file to have streaming semantic as // nonseekable & directio for opened file to have streaming semantic as
// if it was a socket. FOPEN_STREAM is used so that both read and write // if it was a socket. FOPEN_STREAM is used so that both read and write
// could be run simultaneously: git.kernel.org/linus/10dce8af3422 // could be run simultaneously: git.kernel.org/linus/10dce8af3422
return &nodefs.WithFlags{ return &nodefs.WithFlags{
File: sk.file, File: file,
FuseFlags: fuse.FOPEN_STREAM | fuse.FOPEN_NONSEEKABLE | fuse.FOPEN_DIRECT_IO, FuseFlags: fuse.FOPEN_STREAM | fuse.FOPEN_NONSEEKABLE | fuse.FOPEN_DIRECT_IO,
} }
} }
......
...@@ -2356,22 +2356,39 @@ func init() { ...@@ -2356,22 +2356,39 @@ func init() {
gdebug.zheadSockTab = make(map[*FileSock]struct{}) gdebug.zheadSockTab = make(map[*FileSock]struct{})
} }
// _wcfs_Zhead serves .wcfs/zhead opens. // _wcfs_Zhead serves .wcfs/zhead .
type _wcfs_Zhead struct { type _wcfs_Zhead struct {
fsNode fsNode
} }
func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) { // _wcfs_ZheadH serves .wcfs/zhead opens.
type _wcfs_ZheadH struct {
nodefs.File // = .sk.file
sk *FileSock
}
func (*_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// TODO(?) check flags // TODO(?) check flags
sk := NewFileSock() sk := NewFileSock()
sk.CloseRead() sk.CloseRead()
zh := &_wcfs_ZheadH{
File: sk.file,
sk: sk,
}
// TODO del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
gdebug.zheadSockTabMu.Lock() // TODO +fctx -> cancel gdebug.zheadSockTabMu.Lock() // TODO +fctx -> cancel
gdebug.zheadSockTab[sk] = struct{}{} gdebug.zheadSockTab[sk] = struct{}{}
gdebug.zheadSockTabMu.Unlock() gdebug.zheadSockTabMu.Unlock()
return sk.File(), fuse.OK return WithOpenStreamFlags(zh), fuse.OK
}
func (zh *_wcfs_ZheadH) Release() {
gdebug.zheadSockTabMu.Lock()
delete(gdebug.zheadSockTab, zh.sk)
gdebug.zheadSockTabMu.Unlock()
zh.File.Release()
} }
......
...@@ -412,7 +412,12 @@ class tWCFS(_tWCFS): ...@@ -412,7 +412,12 @@ class tWCFS(_tWCFS):
# need to assert that the counters stay in expected state to make sure that # need to assert that the counters stay in expected state to make sure that
# no extra event happened. For instance values we need to assert # no extra event happened. For instance values we need to assert
# eventually as well, because in many cases OS kernel sends events to wcfs # eventually as well, because in many cases OS kernel sends events to wcfs
# asynchronously after client triggers an action. # asynchronously after client triggers an action. For example for ZHeadLink
# after client closes corresponding file handle, the kernel sends RELEASE
# to wcfs asynchronously, and it is only after that final RELEASE when wcfs
# removes corresponding entry from zheadSockTab. So if we would assert on
# instance values immediately after close, it could happen before wcfs
# received corresponding RELEASE and the assertion would fail.
# #
# Note that the set of keys in kvok can be smaller than the full set of keys in stats. # Note that the set of keys in kvok can be smaller than the full set of keys in stats.
def assertStats(t, kvok): def assertStats(t, kvok):
...@@ -513,6 +518,9 @@ class tDB(tWCFS): ...@@ -513,6 +518,9 @@ class tDB(tWCFS):
t._files = set() t._files = set()
t._wlinks = set() t._wlinks = set()
t.assertStats({'ZHeadLink': 1})
@property @property
def head(t): def head(t):
return t.dFtail[-1].rev return t.dFtail[-1].rev
...@@ -533,7 +541,7 @@ class tDB(tWCFS): ...@@ -533,7 +541,7 @@ class tDB(tWCFS):
assert len(t._wlinks) == 0 assert len(t._wlinks) == 0
t._wc_zheadfh.close() t._wc_zheadfh.close()
t.assertStats({'PinnedBlk': 0}) # FIXME + WatchLink, Watch, ZHeadLink t.assertStats({'PinnedBlk': 0, 'ZHeadLink': 0}) # FIXME + WatchLink, Watch
# open opens wcfs file corresponding to zf@at and starts to track it. # open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details. # see returned tFile for details.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment