From bbfb50062b2e0152ec88703b9e9cf49336e34c4d Mon Sep 17 00:00:00 2001
From: Kirill Smelkov <kirr@nexedi.com>
Date: Wed, 21 Mar 2018 22:26:14 +0300
Subject: [PATCH] X zwrk: Make sure we warm up connections to all NEO storages
 when cluster is partitioned

Previous code used to warm up loading only one object which worked ok
for 1 storage case but does not work well for multiple NEO storage
nodes.
---
 go/neo/t/tzodb.go | 116 ++++++++++++++++++++++++++++------------------
 1 file changed, 70 insertions(+), 46 deletions(-)

diff --git a/go/neo/t/tzodb.go b/go/neo/t/tzodb.go
index d0f2ae96..f0419484 100644
--- a/go/neo/t/tzodb.go
+++ b/go/neo/t/tzodb.go
@@ -307,12 +307,12 @@ func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check stri
 		return err
 	}
 
-	// parallel phase
-	defer task.Runningf(&ctx, "zwrk-%d", nwrk)(&err)
-	ctx0 := ctx
-
 	// establish nwrk connections and warm them up
-	storv := make([]zodb.IStorage, nwrk)
+	storv, err := zwrkPreconnect(ctx, url, at, nwrk)
+	if err != nil {
+		return err
+	}
+
 	defer func() {
 		for _, stor := range storv {
 			if stor != nil {
@@ -322,49 +322,11 @@ func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check stri
 		}
 	}()
 
-	wg, ctx := errgroup.WithContext(ctx0)
-	for i := 0; i < nwrk; i++ {
-		i := i
-		wg.Go(func() error {
-			// open storage without caching - we need to take
-			// latency of every request into account, and a cache
-			// could be inhibiting (e.g. making significantly
-			// lower) it for some requests.
-			var opts = zodb.OpenOptions{
-				ReadOnly: true,
-				NoCache:  true,
-			}
-			stor, err := zodb.OpenStorage(ctx, url, &opts)
-			if err != nil {
-				return err
-			}
-			storv[i] = stor
-
-			// ping storage to warm-up the connection
-			// (in case of NEO LastTid connects to master and Load - to storage)
-			_, err = stor.LastTid(ctx)
-			if err != nil {
-				return err
-			}
-
-			buf, _, err := stor.Load(ctx, zodb.Xid{Oid: 0, At: at})
-			buf.XRelease()
-			if err != nil {
-				return err
-			}
-
-			return nil
-		})
-	}
-
-	err = wg.Wait()
-	if err != nil {
-		return err
-	}
 
 	// benchmark parallel loads
+	defer task.Runningf(&ctx, "zwrk-%d/bench", nwrk)(&err)
 	r := testing.Benchmark(func (b *testing.B) {
-		wg, ctx = errgroup.WithContext(ctx0)
+		wg, ctx := errgroup.WithContext(ctx)
 		var n int64
 
 		for i := 0; i < nwrk; i++ {
@@ -419,9 +381,71 @@ func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check stri
 	return nil
 }
 
+// zwrkPreconnect establishes nwrk connections and warms them up.
+func zwrkPreconnect(ctx context.Context, url string, at zodb.Tid, nwrk int) (_ []zodb.IStorage, err error) {
+	defer task.Runningf(&ctx, "zwrk-%d/preconnect", nwrk)(&err)
+	storv := make([]zodb.IStorage, nwrk)
+
+	wg, ctx := errgroup.WithContext(ctx)
+	for i := 0; i < nwrk; i++ {
+		i := i
+		wg.Go(func() error {
+			// open storage without caching - we need to take
+			// latency of every request into account, and a cache
+			// could be inhibiting (e.g. making significantly
+			// lower) it for some requests.
+			var opts = zodb.OpenOptions{
+				ReadOnly: true,
+				NoCache:  true,
+			}
+			stor, err := zodb.OpenStorage(ctx, url, &opts)
+			if err != nil {
+				return err
+			}
+			storv[i] = stor
+
+			// storage to warm-up the connection
+			// ( in case of NEO LastTid connects to master and Load
+			//   - to a storage )
+			_, err = stor.LastTid(ctx)
+			if err != nil {
+				return err
+			}
+
+			// load several first objects to warm up storages connection
+			// we need to load several objects so that in case of
+			// NEO cluster with several storage nodes we warm-up
+			// connections to them all.
+			//
+			// FIXME 16 hardcoded
+			for oid := zodb.Oid(0); oid < 16; oid++ {
+				buf, _, err := stor.Load(ctx, zodb.Xid{Oid: oid, At: at})
+				buf.XRelease()
+				if err != nil {
+					return err
+				}
+			}
+
+			return nil
+		})
+	}
+
+	err = wg.Wait()
+	if err != nil {
+		for _, stor := range storv {
+			if stor != nil {
+				stor.Close()	// XXX lclose
+			}
+		}
+		return nil, err
+	}
+
+	return storv, nil
+}
 
+// zwrkPrepare serially reads all objects and computes per-object crc32.
 func zwrkPrepare(ctx context.Context, url string, h hasher, check string) (at zodb.Tid, objcheckv []uint32, err error) {
-	defer task.Running(&ctx, "zwrk-prepare")(&err)
+	defer task.Running(&ctx, "zwrk/prepare")(&err)
 
 	stor, err := zodb.OpenStorage(ctx, url, &zodb.OpenOptions{ReadOnly: true})
 	if err != nil {
-- 
2.30.9