Commit b15ad994 authored by Levin Zimmermann's avatar Levin Zimmermann

.

parent 9157390d
......@@ -37,7 +37,6 @@ import (
"github.com/shirou/gopsutil/v4/process"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/hanwen/go-fuse/v2/fuse/nodefs"
"github.com/elastic/go-sysinfo"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xerr"
......@@ -548,7 +547,7 @@ func panicf(format string, argv ...interface{}) {
//
// NOTE: starting from go1.23 it, via os.FindProcess, uses pidfd which avoids potential
// race of later signalling to pid of already long-gone and replaced process.
func findAliveProcess(pid int) (_ *sysinfo.types.Process, err error) {
func findAliveProcess(pid int) (_ *os.Process, err error) {
defer xerr.Contextf(&err, "findAlive pid%d", pid)
proc, err := os.FindProcess(pid)
......@@ -565,10 +564,6 @@ func findAliveProcess(pid int) (_ *sysinfo.types.Process, err error) {
proc.Release()
return nil, syscall.ESRCH
}
proc, err := sysinfo.Process(proc.Pid)
if err != nil {
return nil, err
}
return proc, nil
}
......
......@@ -516,6 +516,8 @@ import (
"github.com/johncgriffin/overflow"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/hanwen/go-fuse/v2/fuse/nodefs"
"github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
"github.com/pkg/errors"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xzodb"
......@@ -697,7 +699,7 @@ type WatchLink struct {
down chan struct{} // ready after shutdown completes
pinWG sync.WaitGroup // all pin handlers are accounted here
client *os.Process // client that opened the WatchLink
client Client // client that opened the WatchLink
}
// Watch represents watching for changes to 1 BigFile over particular watch link.
......@@ -742,6 +744,24 @@ type Stats struct {
pinkill atomic.Int64 // # of times a client was killed due to badly handling pin
}
// Client represents a client of WCFS server.
type Client struct {
proc *os.Process // Holds process to send signals to
info types.ProcessInfo // Provides information about process
user types.UserInfo // Provides information about the user that owns the client process
}
func (client Client) Format () string {
return fmt.Sprintf("process(PID=%v;exe=%v;UID=%v)", client.info.PID, client.info.Exe, client.user.UID)
}
func debugClient(client Client, format string, argv ...interface{}) {
if !log.V(2) {
return
}
log.InfoDepth(1, fmt.Sprintf(client.Format() + ": " + format, argv...))
}
// -------- ZODB cache control --------
......@@ -1595,11 +1615,8 @@ func (w *Watch) __pin(ctx context.Context, blk int64, rev zodb.Tid) (err error)
// continue to provide correct uncorrupted data to it. The filesystem is
// switched to EIO mode in such case.
func (wlink *WatchLink) badPinKill(reason error) {
pid := wlink.client.Pid
logf := func(format string, argv ...any) {
emsg := fmt.Sprintf("pid%d: ", pid)
emsg += fmt.Sprintf(format, argv...)
emsg := wlink.client.Format() + ": " + fmt.Sprintf(format, argv...)
log.Error(emsg)
}
logf("client failed to handle pin notification correctly and timely in %s: %s", groot.pinTimeout, reason)
......@@ -1619,7 +1636,7 @@ func (wlink *WatchLink) badPinKill(reason error) {
func (wlink *WatchLink) _badPinKill() error {
client := wlink.client
pid := client.Pid
pid := client.proc.Pid
// time budget for pin + wait + fatal-notify + kill = pinTimeout + 1 + 1/3·pinTimeout
// < 2 ·pinTimeout if pinTimeout > 3/2
......@@ -1640,12 +1657,12 @@ func (wlink *WatchLink) _badPinKill() error {
// siginfo structure. It would be good if we can mimic that behaviour to a
// reasonable extent if possible."
log.Errorf("pid%d: <- SIGBUS", pid)
err := client.Signal(syscall.SIGBUS)
err := client.proc.Signal(syscall.SIGBUS)
if err != nil {
return err
}
ok, err := waitProcessEnd(ctx1, client)
ok, err := waitProcessEnd(ctx1, client.proc)
if err != nil && !errors.Is(err, ctx1.Err()) {
return err
}
......@@ -1655,12 +1672,12 @@ func (wlink *WatchLink) _badPinKill() error {
log.Errorf("pid%d: is still alive after SIGBUS", pid)
log.Errorf("pid%d: <- SIGKILL", pid)
err = client.Signal(syscall.SIGKILL)
err = client.proc.Signal(syscall.SIGKILL)
if err != nil {
return err
}
ok, err = waitProcessEnd(ctx2, client)
ok, err = waitProcessEnd(ctx2, client.proc)
if err != nil && !errors.Is(err, ctx2.Err()) {
return err
}
......@@ -1773,6 +1790,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
// It sends "pin" notifications; final "ok" or "error" must be sent by caller.
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
debugClient(wlink.client, "setup watch oid=%v ; tid=%v", foid, at)
head := wlink.head
bfdir := head.bfdir
......@@ -1990,6 +2008,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
return err // should not fail
}
debugClient(wlink.client, "successfully setup watch oid=%v ; tid=%v", foid, at)
return nil
}
......@@ -2005,14 +2024,32 @@ func (wnode *WatchNode) open(flags uint32, fctx *fuse.Context) (_ nodefs.File, e
// TODO(?) check flags
head := wnode.head
// remember our client who opened the watchlink.
// remember process of our client who opened the watchlink.
// We will need to kill the client if it will be e.g. slow to respond to pin notifications.
client, err := findAliveProcess(int(fctx.Caller.Pid))
proc, err := findAliveProcess(int(fctx.Caller.Pid))
if err != nil {
return nil, err
}
// fetch info of client process for logging
syslog_proc, err := sysinfo.Process(proc.Pid)
if err != nil {
return nil, err
}
proc_info, err := syslog_proc.Info()
if err != nil {
return nil, err
}
user_info, err := syslog_proc.User()
if err != nil {
return nil, err
}
client := Client{proc, proc_info, user_info}
serveCtx, serveCancel := context.WithCancel(context.TODO() /*TODO ctx of wcfs running*/)
debugClient(client, "Open WatchLink")
wlink := &WatchLink{
sk: NewFileSock(),
id: atomic.AddInt32(&wnode.idNext, +1),
......@@ -2029,7 +2066,7 @@ func (wnode *WatchNode) open(flags uint32, fctx *fuse.Context) (_ nodefs.File, e
head.wlinkTab[wlink] = struct{}{}
head.wlinkMu.Unlock()
go wlink.serve(serveCtx)
go wlink.serve(serveCtx, int(fctx.Caller.Pid))
return wlink.sk.File(), nil
}
......@@ -2042,6 +2079,7 @@ func (wnode *WatchNode) open(flags uint32, fctx *fuse.Context) (_ nodefs.File, e
//
// NOTE shutdown can be invoked under atMu.R from pin.
func (wlink *WatchLink) shutdown(reason error) {
debugClient(wlink.client, "shutdown watchlink")
wlink.down1.Do(func() {
// mark wlink as down; this signals serve loop to exit and cancels all in-progress pins
wlink.serveCancel()
......@@ -2079,10 +2117,14 @@ func (wlink *WatchLink) shutdown(reason error) {
// serve serves client initiated watch requests and routes client replies to
// wcfs initiated pin requests.
func (wlink *WatchLink) serve(ctx context.Context) {
func (wlink *WatchLink) serve(ctx context.Context, clientpid int) {
debugClient(wlink.client, "start serving")
err := wlink._serve(ctx)
if err != nil {
debugClient(wlink.client, "stop serving client with error")
log.Error(err)
} else {
debugClient(wlink.client, "stop serving client without error")
}
}
......@@ -2091,6 +2133,8 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
// final watchlink cleanup is done on serve exit
defer func() {
debugClient(wlink.client, "cleanup watchlink")
// unregister all watches created on this wlink
wlink.byfileMu.Lock()
for _, w := range wlink.byfile {
......@@ -2115,7 +2159,7 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
}
// release client process
wlink.client.Release()
wlink.client.proc.Release()
}()
// watch handlers are spawned in dedicated workgroup
......@@ -2223,6 +2267,7 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
// returned error comes without full error prefix.
func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg string) (err error) {
defer xerr.Contextf(&err, "%d", stream)
debugClient(wlink.client, "handleWatch: %v", msg)
err = wlink._handleWatch(ctx, msg)
reply := "ok"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment