Commit bbfb5006 authored by Kirill Smelkov's avatar Kirill Smelkov

X zwrk: Make sure we warm up connections to all NEO storages when cluster is partitioned

Previous code used to warm up loading only one object which worked ok
for 1 storage case but does not work well for multiple NEO storage
nodes.
parent e0d875bc
...@@ -307,12 +307,12 @@ func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check stri ...@@ -307,12 +307,12 @@ func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check stri
return err return err
} }
// parallel phase
defer task.Runningf(&ctx, "zwrk-%d", nwrk)(&err)
ctx0 := ctx
// establish nwrk connections and warm them up // establish nwrk connections and warm them up
storv := make([]zodb.IStorage, nwrk) storv, err := zwrkPreconnect(ctx, url, at, nwrk)
if err != nil {
return err
}
defer func() { defer func() {
for _, stor := range storv { for _, stor := range storv {
if stor != nil { if stor != nil {
...@@ -322,49 +322,11 @@ func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check stri ...@@ -322,49 +322,11 @@ func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check stri
} }
}() }()
wg, ctx := errgroup.WithContext(ctx0)
for i := 0; i < nwrk; i++ {
i := i
wg.Go(func() error {
// open storage without caching - we need to take
// latency of every request into account, and a cache
// could be inhibiting (e.g. making significantly
// lower) it for some requests.
var opts = zodb.OpenOptions{
ReadOnly: true,
NoCache: true,
}
stor, err := zodb.OpenStorage(ctx, url, &opts)
if err != nil {
return err
}
storv[i] = stor
// ping storage to warm-up the connection
// (in case of NEO LastTid connects to master and Load - to storage)
_, err = stor.LastTid(ctx)
if err != nil {
return err
}
buf, _, err := stor.Load(ctx, zodb.Xid{Oid: 0, At: at})
buf.XRelease()
if err != nil {
return err
}
return nil
})
}
err = wg.Wait()
if err != nil {
return err
}
// benchmark parallel loads // benchmark parallel loads
defer task.Runningf(&ctx, "zwrk-%d/bench", nwrk)(&err)
r := testing.Benchmark(func (b *testing.B) { r := testing.Benchmark(func (b *testing.B) {
wg, ctx = errgroup.WithContext(ctx0) wg, ctx := errgroup.WithContext(ctx)
var n int64 var n int64
for i := 0; i < nwrk; i++ { for i := 0; i < nwrk; i++ {
...@@ -419,9 +381,71 @@ func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check stri ...@@ -419,9 +381,71 @@ func zwrk(ctx context.Context, url string, nwrk int, h hasher, bench, check stri
return nil return nil
} }
// zwrkPreconnect establishes nwrk connections and warms them up.
func zwrkPreconnect(ctx context.Context, url string, at zodb.Tid, nwrk int) (_ []zodb.IStorage, err error) {
defer task.Runningf(&ctx, "zwrk-%d/preconnect", nwrk)(&err)
storv := make([]zodb.IStorage, nwrk)
wg, ctx := errgroup.WithContext(ctx)
for i := 0; i < nwrk; i++ {
i := i
wg.Go(func() error {
// open storage without caching - we need to take
// latency of every request into account, and a cache
// could be inhibiting (e.g. making significantly
// lower) it for some requests.
var opts = zodb.OpenOptions{
ReadOnly: true,
NoCache: true,
}
stor, err := zodb.OpenStorage(ctx, url, &opts)
if err != nil {
return err
}
storv[i] = stor
// storage to warm-up the connection
// ( in case of NEO LastTid connects to master and Load
// - to a storage )
_, err = stor.LastTid(ctx)
if err != nil {
return err
}
// load several first objects to warm up storages connection
// we need to load several objects so that in case of
// NEO cluster with several storage nodes we warm-up
// connections to them all.
//
// FIXME 16 hardcoded
for oid := zodb.Oid(0); oid < 16; oid++ {
buf, _, err := stor.Load(ctx, zodb.Xid{Oid: oid, At: at})
buf.XRelease()
if err != nil {
return err
}
}
return nil
})
}
err = wg.Wait()
if err != nil {
for _, stor := range storv {
if stor != nil {
stor.Close() // XXX lclose
}
}
return nil, err
}
return storv, nil
}
// zwrkPrepare serially reads all objects and computes per-object crc32.
func zwrkPrepare(ctx context.Context, url string, h hasher, check string) (at zodb.Tid, objcheckv []uint32, err error) { func zwrkPrepare(ctx context.Context, url string, h hasher, check string) (at zodb.Tid, objcheckv []uint32, err error) {
defer task.Running(&ctx, "zwrk-prepare")(&err) defer task.Running(&ctx, "zwrk/prepare")(&err)
stor, err := zodb.OpenStorage(ctx, url, &zodb.OpenOptions{ReadOnly: true}) stor, err := zodb.OpenStorage(ctx, url, &zodb.OpenOptions{ReadOnly: true})
if err != nil { if err != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment