Commit b4cabad6 authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Add .wcfs/stats file with basic usage statistics

Report there number of inside-WCFS instances, e.g. number of tracked
BigFiles, WatchLinks etc, and also number of counted events, for example
how many times a pin event happened.

Soon we will need this statistics to implement tests e.g. for pinkilling
and other functionalities, and it might be also useful to have in general.
parent 380a8793
...@@ -542,6 +542,9 @@ type Root struct { ...@@ -542,6 +542,9 @@ type Root struct {
// directories + ZODB connections for @<rev>/ // directories + ZODB connections for @<rev>/
revMu sync.Mutex revMu sync.Mutex
revTab map[zodb.Tid]*Head revTab map[zodb.Tid]*Head
// collected statistics
stats *Stats
} }
// /(head|<rev>)/ - served by Head. // /(head|<rev>)/ - served by Head.
...@@ -704,6 +707,14 @@ type blkPinState struct { ...@@ -704,6 +707,14 @@ type blkPinState struct {
err error err error
} }
// Stats keeps collected statistics.
//
// The statistics is accessible via .wcfs/stats file served by _wcfs_Stats.
type Stats struct {
pin atomic.Int64 // # of times wcfs issued pin request
}
// -------- ZODB cache control -------- // -------- ZODB cache control --------
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict // zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
...@@ -1472,6 +1483,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1472,6 +1483,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// perform IO without w.pinnedMu // perform IO without w.pinnedMu
w.pinnedMu.Unlock() w.pinnedMu.Unlock()
groot.stats.pin.Add(1)
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr)) ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
w.pinnedMu.Lock() w.pinnedMu.Lock()
...@@ -2362,6 +2374,74 @@ func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse ...@@ -2362,6 +2374,74 @@ func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse
return sk.File(), fuse.OK return sk.File(), fuse.OK
} }
// _wcfs_Stats serves .wcfs/stats reads.
//
// In the output:
//
// - entries that start with capital letter, e.g. "Watch", indicate current
// number of named instances. This numbers can go up and down.
//
// - entries that start with lowercase letter, e.g. "pin", indicate number of
// named event occurrences. This numbers are cumulative counters and should
// never go down.
func _wcfs_Stats(fctx *fuse.Context) ([]byte, error) {
stats := ""
num := func(name string, value any) {
stats += fmt.Sprintf("%s\t: %d\n", name, value)
}
root := groot
head := root.head
bfdir := head.bfdir
// dump information detected at runtime
root.revMu.Lock()
lenRevTab := len(root.revTab)
root.revMu.Unlock()
head.wlinkMu.Lock()
ΣWatch := 0
ΣPinnedBlk := 0
lenWLinkTab := len(head.wlinkTab)
for wlink := range head.wlinkTab {
wlink.byfileMu.Lock()
ΣWatch += len(wlink.byfile)
for _, w := range wlink.byfile {
w.atMu.RLock()
w.pinnedMu.Lock()
ΣPinnedBlk += len(w.pinned)
w.pinnedMu.Unlock()
w.atMu.RUnlock()
}
wlink.byfileMu.Unlock()
}
head.wlinkMu.Unlock()
head.zheadMu.RLock()
bfdir.fileMu.Lock()
lenFileTab := len(bfdir.fileTab)
bfdir.fileMu.Unlock()
head.zheadMu.RUnlock()
gdebug.zheadSockTabMu.Lock()
lenZHeadSockTab := len(gdebug.zheadSockTab)
gdebug.zheadSockTabMu.Unlock()
num("BigFile", lenFileTab) // # of head/BigFile
num("RevHead", lenRevTab) // # of @revX/ directories
num("ZHeadLink", lenZHeadSockTab) // # of open .wcfs/zhead handles
num("WatchLink", lenWLinkTab) // # of open watchlinks
num("Watch", ΣWatch) // # of setup watches
num("PinnedBlk", ΣPinnedBlk) // # of currently on-client pinned blocks
// dump information collected in root.stats
s := root.stats
num("pin", s.pin.Load())
return []byte(stats), nil
}
// TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?) // TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?)
func main() { func main() {
...@@ -2465,6 +2545,7 @@ func _main() (err error) { ...@@ -2465,6 +2545,7 @@ func _main() (err error) {
zdb: zdb, zdb: zdb,
head: head, head: head,
revTab: make(map[zodb.Tid]*Head), revTab: make(map[zodb.Tid]*Head),
stats: &Stats{},
} }
opts := &fuse.MountOptions{ opts := &fuse.MountOptions{
...@@ -2532,6 +2613,10 @@ func _main() (err error) { ...@@ -2532,6 +2613,10 @@ func _main() (err error) {
fsNode: newFSNode(fSticky), fsNode: newFSNode(fSticky),
}) })
// .wcfs/stats - special file with collected statistics.
mkfile(&_wcfs, "stats", NewSmallFile(_wcfs_Stats))
// TODO handle autoexit // TODO handle autoexit
// (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl // (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl
// opened, so when all inodes has been forgotten - we know all wcfs.py clients exited) // opened, so when all inodes has been forgotten - we know all wcfs.py clients exited)
......
...@@ -367,6 +367,11 @@ class tWCFS(_tWCFS): ...@@ -367,6 +367,11 @@ class tWCFS(_tWCFS):
go(t._abort_ontimeout, t._wcfuseabort, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout go(t._abort_ontimeout, t._wcfuseabort, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready.recv() # wait till _abort_ontimeout enters nogil nogilready.recv() # wait till _abort_ontimeout enters nogil
t._stats_prev = None
t.assertStats({'BigFile': 0, 'RevHead': 0, 'ZHeadLink': 0,
'WatchLink': 0, 'Watch': 0, 'PinnedBlk': 0,
'pin': 0})
# _abort_ontimeout is in wcfs_test.pyx # _abort_ontimeout is in wcfs_test.pyx
# close closes connection to wcfs, unmounts the filesystem and makes sure # close closes connection to wcfs, unmounts the filesystem and makes sure
...@@ -394,6 +399,69 @@ class tWCFS(_tWCFS): ...@@ -394,6 +399,69 @@ class tWCFS(_tWCFS):
defer(t.wc.close) defer(t.wc.close)
assert is_mountpoint(t.wc.mountpoint) assert is_mountpoint(t.wc.mountpoint)
# assertStats asserts that content of .wcfs/stats eventually reaches expected state.
#
# For all keys k from kvok it verifies that eventually stats[k] == kvok[k]
# and that it stays that way.
#
# The state is asserted eventually instead of immediately - for both
# counters and instance values - because wcfs increments a counter
# _after_ corresponding event happened,
# and the tests can start to observe that state
# before wcfs actually does counter increment. For the similar reason we
# need to assert that the counters stay in expected state to make sure that
# no extra event happened. For instance values we need to assert
# eventually as well, because in many cases OS kernel sends events to wcfs
# asynchronously after client triggers an action.
#
# Note that the set of keys in kvok can be smaller than the full set of keys in stats.
def assertStats(t, kvok):
# kstats loads stats subset with kvok keys.
def kstats():
stats = t._loadStats()
kstats = {}
for k in kvok.keys():
kstats[k] = stats.get(k, None)
return kstats
# wait till stats reaches expected state
ctx = timeout()
while 1:
kv = kstats()
if kv == kvok:
break
if ctx.err() is not None:
assert kv == kvok, "stats did not reach expected state"
tdelay()
# make sure that it stays that way for some time
# we do not want to make the assertion time big because it will results
# in slowing down all tests
for _ in range(3):
tdelay()
kv = kstats()
assert kv == kvok, "stats did not stay at expected state"
# _loadStats loads content of .wcfs/stats .
def _loadStats(t): # -> {}
stats = {}
for l in t.wc._read(".wcfs/stats").splitlines():
# key : value
k, v = l.split(':')
k = k.strip()
v = v.strip()
stats[k] = int(v)
# verify that keys remains the same and that cumulative counters do not decrease
if t._stats_prev is not None:
assert stats.keys() == t._stats_prev.keys()
for k in stats.keys():
if k[0].islower():
assert stats[k] >= t._stats_prev[k], k
t._stats_prev = stats
return stats
class tDB(tWCFS): class tDB(tWCFS):
# __init__ initializes test database and wcfs. # __init__ initializes test database and wcfs.
...@@ -465,6 +533,8 @@ class tDB(tWCFS): ...@@ -465,6 +533,8 @@ class tDB(tWCFS):
assert len(t._wlinks) == 0 assert len(t._wlinks) == 0
t._wc_zheadfh.close() t._wc_zheadfh.close()
t.assertStats({'PinnedBlk': 0}) # FIXME + WatchLink, Watch, ZHeadLink
# open opens wcfs file corresponding to zf@at and starts to track it. # open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details. # see returned tFile for details.
def open(t, zf, at=None): # -> tFile def open(t, zf, at=None): # -> tFile
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment