From be3e090cd68ee1c082ffc55eead46cd59c02270d Mon Sep 17 00:00:00 2001
From: Kirill Smelkov <kirr@nexedi.com>
Date: Sun, 31 Mar 2019 16:53:26 +0300
Subject: [PATCH] .

---
 wcfs/wcfs.go | 58 ++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 43 insertions(+), 15 deletions(-)

diff --git a/wcfs/wcfs.go b/wcfs/wcfs.go
index 4dbf273c..d2ba96ff 100644
--- a/wcfs/wcfs.go
+++ b/wcfs/wcfs.go
@@ -563,9 +563,9 @@ type WatchLink struct {
 	fileTab map[zodb.Oid]*Watch // {} foid -> Watch
 
 	// IO
-//	acceptq chan string // (stream, msg)   // client-initiated messages go here
-	rxMu    sync.Mutex
-	rxTab   map[uint64]chan string // client replies go via here
+	txMu  sync.Mutex
+	rxMu  sync.Mutex
+	rxTab map[uint64]chan string // client replies go via here
 }
 
 // Watch represents watching for changes to 1 BigFile over particular watch link.
@@ -1260,6 +1260,7 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
 	}
 
 	// XXX comment
+	// XXX file.δtail has not full info
 	rev, _ = w.file.δtail.LastRevOf(blk, w.at)
 
 	ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, rev))
@@ -1277,6 +1278,8 @@ func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
 
 // setupWatch sets up a Watch when client sends `watch <file> @<at>` request.
 //
+// XXX sends "pin" notifications; final "ok" must be sent by caller.
+//
 // XXX called synchronously - only 1 setupWatch call at a time?
 func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
 	defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
@@ -1287,7 +1290,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
 	// XXX if watch was already established - we need to update it
 	w := wlink.fileTab[foid]
 	if w != nil {
-		// XXX update the watch
+		panic("TODO")	// XXX update the watch
 	}
 
 	// watch was not previously established - set it up anew
@@ -1307,7 +1310,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
 		return fmt.Errorf("at is too far away back from head/at")
 	}
 
-	toPin := map[int64]zodb.Tid // blk -> @rev
+	toPin := map[int64]zodb.Tid{} // blk -> @rev
 
 	// XXX f.δtail.Head() not neccessarily = head.At()
 	// (if f was not changed by a txn, f.δtail stays not updated)	XXX correct?
@@ -1331,7 +1334,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
 		blk := blk
 		rev := rev
 		wg.Go(func() error {
-			return wlink.pin(ctx, blk, rev)
+			return w.pin(ctx, blk, rev)
 		})
 	}
 	err = wg.Wait()
@@ -1339,7 +1342,9 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
 		panic(err)	// XXX
 	}
 
-	// XXX something else?
+	// XXX register w to f (here ?)
+
+	return nil
 }
 
 // Open serves /head/watch opens.
@@ -1362,8 +1367,8 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
 	return wlink.sk.File(), fuse.OK
 }
 
-// serveRX serves client originated watch requests and routes client replies to
-// wcfs originated requests.
+// serveRX serves client initiated watch requests and routes client replies to
+// wcfs initiated requests.
 func (wlink *WatchLink) serveRX() {
 	err := wlink._serveRX()
 	_ = err
@@ -1414,17 +1419,23 @@ func (wlink *WatchLink) _serveRX() (err error) {
 			return fmt.Errorf("%d: %s", stream, err)
 		}
 
-		err = wlink.setupWatch(context.TODO(), foid, at)	// XXX ctx = ?
+		ctx := context.TODO()	// XXX ctx = ?
+
+		err = wlink.setupWatch(ctx, foid, at)
 		if err != nil {
 			return fmt.Errorf("%d: %s", stream, err)
 		}
+
+		err = wlink.send(ctx, stream, "ok")
+		if err != nil {
+			panic(err)	// XXX
+		}
 	}
 }
 
 // sendReq sends wcfs-originated request to client and returns client response.
 func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
 	// XXX err ctx
-	// XXX assert '\n' not in req
 	stream := uint64(2) // FIXME allocate stream anew as several in-flight sendReq are possible
 
 	rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
@@ -1432,10 +1443,7 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
 	wlink.rxTab[stream] = rxq	// XXX assert .stream is not there?
 	wlink.rxMu.Unlock()
 
-	// XXX lock tx
-	// XXX timeout write on ctx cancel
-
-	_, err = wlink.sk.Write([]byte(fmt.Sprintf("%d %s\n", stream, req)))
+	err = wlink.send(ctx, stream, req)
 	if err != nil {
 		return "", err
 	}
@@ -1449,6 +1457,26 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
 	}
 }
 
+// send sends a message message to client over specified stream ID.
+//
+// Multiple send can be called simultaneously; send serializes writes.
+func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) error {
+	// XXX err ctx
+	// XXX assert '\n' not in msg
+
+	wlink.txMu.Lock()
+	defer wlink.txMu.Unlock()
+
+	// XXX timeout write on ctx cancel
+	pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg))
+	_, err := wlink.sk.Write(pkt)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
 
 // ---- Lookup ----
 
-- 
2.30.9