Commit b17aeb8c authored by Kirill Smelkov's avatar Kirill Smelkov

X Change FileSock to use xio.Pipe which is io.Pipe + support for IO cancellation

We need to do this because when e.g. a thread in client process dies and
wants to abort the whole process, kernel sends FUSE INTERRUPT request to
interrup READ syscall on-client pinner is waiting on for /head/watch.
And if this cancellation is not handled, the client process is left hang
forever and even kill -9 does not stop it.

xio.Pipe to be committed/pushed yet to go123.

test_wcfs_watch_robust becomes broken for now.
parent 00110399
...@@ -39,6 +39,7 @@ import ( ...@@ -39,6 +39,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xcontext" "lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xio"
"lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -272,8 +273,8 @@ func mount(mntpt string, root nodefs.Node, opts *fuse.MountOptions) (*fuse.Serve ...@@ -272,8 +273,8 @@ func mount(mntpt string, root nodefs.Node, opts *fuse.MountOptions) (*fuse.Serve
// are correspondingly matched with read/write operations on filesystem user side. // are correspondingly matched with read/write operations on filesystem user side.
type FileSock struct { type FileSock struct {
file *skFile // filesock's file peer file *skFile // filesock's file peer
rx *io.PipeReader // socket reads from file here rx *xio.PipeReader // socket reads from file here
tx *io.PipeWriter // socket writes to file here tx *xio.PipeWriter // socket writes to file here
} }
// skFile is File peer of FileSock. // skFile is File peer of FileSock.
...@@ -286,8 +287,8 @@ type FileSock struct { ...@@ -286,8 +287,8 @@ type FileSock struct {
type skFile struct { type skFile struct {
nodefs.File nodefs.File
rx *io.PipeReader // file reads from socket here rx *xio.PipeReader // file reads from socket here
tx *io.PipeWriter // file writes to socket here tx *xio.PipeWriter // file writes to socket here
} }
// NewFileSock creates new file socket. // NewFileSock creates new file socket.
...@@ -301,11 +302,11 @@ func NewFileSock() *FileSock { ...@@ -301,11 +302,11 @@ func NewFileSock() *FileSock {
} }
sk.file = f sk.file = f
rx, tx := io.Pipe() rx, tx := xio.Pipe()
sk.rx = rx sk.rx = rx
f .tx = tx f .tx = tx
rx, tx = io.Pipe() rx, tx = xio.Pipe()
f .rx = rx f .rx = rx
sk.tx = tx sk.tx = tx
...@@ -332,15 +333,15 @@ func (sk *FileSock) File() nodefs.File { ...@@ -332,15 +333,15 @@ func (sk *FileSock) File() nodefs.File {
// Write writes data to filesock. // Write writes data to filesock.
// //
// The data will be read by client reading from filesock's file. // The data will be read by client reading from filesock's file.
// Write semantic is that of io.Writer. // Write semantic is that of xio.Writer.
func (sk *FileSock) Write(data []byte) (n int, err error) { func (sk *FileSock) Write(ctx context.Context, data []byte) (n int, err error) {
// XXX err ctx? // XXX err ctx?
return sk.tx.Write(data) return sk.tx.Write(ctx, data)
} }
// Read implements nodefs.File and is paired with filesock.Write(). // Read implements nodefs.File and is paired with filesock.Write().
func (f *skFile) Read(dest []byte, /*ignored*/off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) { func (f *skFile) Read(dest []byte, /*ignored*/off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
n, err := f.rx.Read(dest) // XXX fctx.cancel n, err := f.rx.Read(fctx, dest)
if n != 0 { if n != 0 {
err = nil err = nil
} }
...@@ -358,10 +359,10 @@ func (f *skFile) Read(dest []byte, /*ignored*/off int64, fctx *fuse.Context) (fu ...@@ -358,10 +359,10 @@ func (f *skFile) Read(dest []byte, /*ignored*/off int64, fctx *fuse.Context) (fu
// Read reads data from filesock. // Read reads data from filesock.
// //
// The data read will be that the client writes into filesock's file. // The data read will be that the client writes into filesock's file.
// Read semantic is that of io.Reader. // Read semantic is that of xio.Reader.
func (sk *FileSock) Read(dest []byte) (n int, err error) { func (sk *FileSock) Read(ctx context.Context, dest []byte) (n int, err error) {
// XXX err ctx? // XXX err ctx?
return sk.rx.Read(dest) return sk.rx.Read(ctx, dest)
} }
// Write implements nodefs.File and is paired with filesock.Read() // Write implements nodefs.File and is paired with filesock.Read()
...@@ -373,7 +374,7 @@ func (f *skFile) Write(data []byte, /*ignored*/off int64, fctx *fuse.Context) (u ...@@ -373,7 +374,7 @@ func (f *skFile) Write(data []byte, /*ignored*/off int64, fctx *fuse.Context) (u
data = data[:l] data = data[:l]
} }
n, err := f.tx.Write(data) // XXX fctx.cancel n, err := f.tx.Write(fctx, data)
if n != 0 { if n != 0 {
err = nil err = nil
} }
......
...@@ -491,6 +491,7 @@ import ( ...@@ -491,6 +491,7 @@ import (
"lab.nexedi.com/kirr/go123/xcontext" "lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xio"
"lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -924,7 +925,7 @@ retry: ...@@ -924,7 +925,7 @@ retry:
// notify .wcfs/zhead // notify .wcfs/zhead
for sk := range gdebug.zheadSockTab { for sk := range gdebug.zheadSockTab {
_, err := fmt.Fprintf(sk, "%s\n", δZ.Tid) _, err := fmt.Fprintf(xio.BindWriter(sk, ctx), "%s\n", δZ.Tid)
if err != nil { if err != nil {
log.Errorf("%s", err) // XXX errctx + file, handle, reader pid log.Errorf("%s", err) // XXX errctx + file, handle, reader pid
sk.Close() sk.Close()
...@@ -1731,13 +1732,14 @@ func (wlink *WatchLink) serve() { ...@@ -1731,13 +1732,14 @@ func (wlink *WatchLink) serve() {
func (wlink *WatchLink) _serve() (err error) { func (wlink *WatchLink) _serve() (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id) defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
r := bufio.NewReader(wlink.sk)
ctx0 := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout) ctx0 := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0) ctx, cancel := context.WithCancel(ctx0)
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
r := bufio.NewReader(xio.BindReader(wlink.sk, ctx))
defer func() { defer func() {
// cancel all handlers on both error and ok return. // cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client // ( ok return is e.g. when we received "bye", so if client
...@@ -1908,10 +1910,9 @@ func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) err ...@@ -1908,10 +1910,9 @@ func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) err
wlink.txMu.Lock() wlink.txMu.Lock()
defer wlink.txMu.Unlock() defer wlink.txMu.Unlock()
// XXX timeout write on ctx cancel
pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg)) pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg))
fmt.Printf("S: wlink %d: tx: %q\n", wlink.id, pkt) fmt.Printf("S: wlink %d: tx: %q\n", wlink.id, pkt)
_, err := wlink.sk.Write(pkt) _, err := wlink.sk.Write(ctx, pkt)
if err != nil { if err != nil {
return err return err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment