From 3725aa97ba30508a8ffcf3926174794699706ac5 Mon Sep 17 00:00:00 2001
From: Kirill Smelkov <kirr@nexedi.com>
Date: Fri, 29 Mar 2019 20:34:54 +0300
Subject: [PATCH] .

---
 wcfs/wcfs.go | 54 +++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 41 insertions(+), 13 deletions(-)

diff --git a/wcfs/wcfs.go b/wcfs/wcfs.go
index d4c591b..434f250 100644
--- a/wcfs/wcfs.go
+++ b/wcfs/wcfs.go
@@ -493,7 +493,7 @@ type Watcher struct {
 	head *Head
 
 	// established file watchers.
-	// XXX in-progress - where?
+	// XXX in-progress - where?	-> nowhere; here only established watches are added
 	// XXX locking?
 	fileTab map[*FileWatch]struct{}
 
@@ -1262,7 +1262,7 @@ func (w *FileWatch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error
 	// XXX comment
 	rev, _ = w.file.δFtail.LastRevOf(blk, w.at)
 
-	ack, err := w.link.send(ctx, fmt.Sprintf("pin %s #%s @%s", foid, blk, rev))
+	ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%s @%s", foid, blk, rev))
 	if err != nil {
 		return err
 	}
@@ -1294,14 +1294,14 @@ func (watch *Watch) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.St
 	// XXX del watchTab[w] on w.sk.File.Release
 	head.watchTab[w] = struct{}{}
 
-	go w.serve()
+	go w.serveRX()
 	return w.sk.File(), fuse.OK
 }
 
-// serve serves client originated watch requests.
-// XXX serves rx? (-> and routes client replies ...)
-func (w *Watcher) serve() {
-	err := w._serve()
+// serveRX serves client originated watch requests and routes client replies to
+// wcfs originated requests.
+func (w *Watcher) serveRX() {
+	err := w._serveRX()
 	_ = err
 	// XXX log error if !close
 	// XXX close if was not closed?
@@ -1309,15 +1309,15 @@ func (w *Watcher) serve() {
 	delete(w.head.watchTab, w)
 }
 
-func (w *Watcher) _serve() (err error) {
-	defer xerr.Contextf(&err, "watcher %d: serve", w.id)
+func (w *Watcher) _serveRX() (err error) {
+	defer xerr.Contextf(&err, "watcher %d: serve rx", w.id)
 	r := bufio.NewReader(w.sk)
 
 	// XXX write to peer if it was logical error on client side
 	// XXX on which stream? -1?
 
 	for {
-		l, err := r.ReadString('\n')	// XXX limit accepted line len not to DOS
+		l, err := r.ReadString('\n')	// XXX limit accepted line len to prevent DOS
 		if err != nil {
 			return err
 		}
@@ -1326,7 +1326,7 @@ func (w *Watcher) _serve() (err error) {
 
 		stream, msg, err := parseWatchFrame(l)
 		if err != nil {
-			return fmt.Errorf("rx: %s", err)
+			return fmt.Errorf("%s", err)
 		}
 
 		// reply from client to to wcfs
@@ -1338,7 +1338,7 @@ func (w *Watcher) _serve() (err error) {
 			w.rxMu.Unlock()
 
 			if rxq == nil {
-				return fmt.Errorf("rx %d: reply on unexpected stream", stream)
+				return fmt.Errorf("%d: reply on unexpected stream", stream)
 			}
 			rxq <- msg
 			continue
@@ -1347,7 +1347,7 @@ func (w *Watcher) _serve() (err error) {
 		// client-initiated request
 		oid, at, err := parseWatch(msg)
 		if err != nil {
-			return fmt.Errorf("rx %d: %s", stream, err)
+			return fmt.Errorf("%d: %s", stream, err)
 		}
 
 		_ = oid
@@ -1360,6 +1360,34 @@ func (w *Watcher) _serve() (err error) {
 	}
 }
 
+// sendReq sends wcfs-originated request to client and returns client response.
+func (w *Watcher) sendReq(ctx context.Context, req string) (reply string, err error) {
+	// XXX err ctx
+	// XXX assert '\n' not in req
+	stream = ...	// XXX
+
+	rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
+	w.rxMu.Lock()
+	w.rxTab[stream] = rxq	// XXX assert .stream is not there?
+	w.rxMu.Unlock()
+
+	// XXX lock tx
+	// XXX timeout write on ctx cancel
+
+	err = w.sk.Write(fmt.Sprintf("%d %s\n", stream, req))
+	if err != nil {
+		return "", err
+	}
+
+	select {
+	case <-ctx.Done():
+		return "", ctx.Err()
+
+	case reply = <-rxq:
+		return reply, nil
+	}
+}
+
 
 // ---- Lookup ----
 
-- 
2.30.9