Commit b09ad0e2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bfb2a7ee
...@@ -34,7 +34,6 @@ import ( ...@@ -34,7 +34,6 @@ import (
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/storage"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -49,53 +48,6 @@ import ( ...@@ -49,53 +48,6 @@ import (
"time" "time"
) )
// test-wrapper around Storage - to automatically listen by address, not provided listener.
type tStorage struct {
*Storage
serveAddr string
}
func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, back storage.Backend) *tStorage {
return &tStorage{
Storage: NewStorage(clusterName, masterAddr, net, back),
serveAddr: serveAddr,
}
}
func (s *tStorage) Run(ctx context.Context) error {
l, err := s.node.Net.Listen(s.serveAddr)
if err != nil {
return err
}
return s.Storage.Run(ctx, l)
}
// test-wrapper around Master - to automatically listen by address, not provided listener.
type tMaster struct {
*Master
serveAddr string
}
func tNewMaster(clusterName, serveAddr string, net xnet.Networker) *tMaster {
return &tMaster{
Master: NewMaster(clusterName, net),
serveAddr: serveAddr,
}
}
func (m *tMaster) Run(ctx context.Context) error {
l, err := m.node.Net.Listen(m.serveAddr)
if err != nil {
return err
}
return m.Master.Run(ctx, l)
}
// ----------------------------------------
/* /*
func TestMasterStorage0(t0 *testing.T) { func TestMasterStorage0(t0 *testing.T) {
...@@ -107,7 +59,8 @@ func TestMasterStorage0(t0 *testing.T) { ...@@ -107,7 +59,8 @@ func TestMasterStorage0(t0 *testing.T) {
C := t.NewClient("c") C := t.NewClient("c")
tM := t.Checker("m.main") tM := t.Checker("m")
tS := t.Checker("s")
tMS := t.Checker("m-s") tMS := t.Checker("m-s")
tSM := t.Checker("s-m") tSM := t.Checker("s-m")
......
...@@ -21,23 +21,43 @@ package neo ...@@ -21,23 +21,43 @@ package neo
// infrastructure for creating NEO test clusters. // infrastructure for creating NEO test clusters.
import ( import (
"context"
"fmt"
"sync"
"testing" "testing"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/pipenet" "lab.nexedi.com/kirr/go123/xnet/pipenet"
"lab.nexedi.com/kirr/neo/go/xcommon/xtracing/tracetest"
"lab.nexedi.com/kirr/neo/go/neo/storage"
) )
// TestCluster ... XXX // TestCluster ... XXX
type TestCluster struct { type TestCluster struct {
name string name string
net *pipenet.Network // XXX -> lo network *pipenet.Network // XXX -> lo
gotracer *TraceCollector // XXX -> GoTracer gotracer *TraceCollector // XXX -> GoTracer
//tpy *PyTracer //tpy *PyTracer
erouter *EventRouter
edispatch *tracetest.EventDispatcher
tabMu sync.Mutex
nodeTab map[string/*node*/]*tNode
checkTab map[string/*node*/]*tracetest.EventChecker
ttest testing.TB // original testing env this cluster was created at ttest testing.TB // original testing env this cluster was created at
} }
// tNode represents information about a test node ... XXX
type tNode struct {
net xnet.Networker
}
// XXX stub // XXX stub
type ITestMaster interface {} type ITestMaster interface {}
type ITestStorage interface {} type ITestStorage interface {}
...@@ -49,11 +69,23 @@ type ITestClient interface {} ...@@ -49,11 +69,23 @@ type ITestClient interface {}
// //
// XXX defer t.Stop() // XXX defer t.Stop()
func NewTestCluster(ttest testing.TB, name string) *TestCluster { func NewTestCluster(ttest testing.TB, name string) *TestCluster {
return &TestCluster{ t := &TestCluster{
name: name, name: name,
network: pipenet.New("testnet"), // test network
nodeTab: make(map[string]*tNode),
checkTab: make(map[string]*tracetest.EventChecker),
//... XXX //... XXX
ttest: ttest, ttest: ttest,
} }
t.erouter = NewEventRouter()
t.edispatch = tracetest.NewEventDispatcher(t.erouter)
t.gotracer = NewTraceCollector(t.edispatch)
t.gotracer.Attach()
return t
} }
// Stop stops the cluster. // Stop stops the cluster.
...@@ -69,33 +101,76 @@ func (t *TestCluster) Stop() error { ...@@ -69,33 +101,76 @@ func (t *TestCluster) Stop() error {
return nil return nil
} }
// NewMaster creates new master on node. // Checker returns tracechecker corresponding to name.
// //
// The master will be accepting incoming connections at node:1. // name might be of "node" or "node1-node2" kind. XXX more text
// The node must be not yet existing and will be dedicated to the created master fully. XXX // node or node1/node2 must be already registered.
func (t *TestCluster) Checker(name string) *tracetest.EventChecker {
t.tabMu.Lock()
defer t.tabMu.Unlock()
c, ok := t.checkTab[name]
if !ok {
panic(fmt.Sprintf("test cluster: no %q checker", name))
}
return c
}
// registerNewNode registers new node with given name.
// //
// XXX error of creating py process? // the node is registered in .nodeTab and .checkTab is populated ... XXX
func (t *TestCluster) NewMaster(node string) ITestMaster { //
//... XXX // the node must not be registered before.
func (t *TestCluster) registerNewNode(name string) *tNode {
t.tabMu.Lock()
defer t.tabMu.Unlock()
// check node not yet registered
if _, already := t.nodeTab[name]; already {
panic(fmt.Sprintf("test cluster: node %q registered twice", name))
}
// XXX check name is unique host name - not already registered // tracechecker for events on node
// XXX set M clock to vclock.monotime c1 := tracetest.NewSyncChan(name) // trace of events local to node
t.erouter.BranchNode(name, c1)
t.checkTab[name] = tracetest.NewEventChecker(t.ttest, t.edispatch, c1)
// tracetest.NewSyncChan("m.main") // tracecheckers for events on links of all node1-node2 pairs
// foreach node1,node2: for name2 := range t.nodeTab {
// tracetest.NewChan("node1-node2") // trace of events with cause root being n1 -> n2 send // trace of events with cause root being node1 -> node2 send
// tracetest.NewChan("node2-node1") // trace of events with cause root being n2 -> n1 send c12 := tracetest.NewSyncChan(name + "-" + name2)
// ----//---- node2 -> node1 send
c21 := tracetest.NewSyncChan(name2 + "-" + name)
// for each created tracetest.Chan -> create tracetest.EventChecker t12 := tracetest.NewEventChecker(t.ttest, t.edispatch, c12)
t21 := tracetest.NewEventChecker(t.ttest, t.edispatch, c21)
//rt.BranchNode("m", cM) t.erouter.BranchLink(name + "-" + name2, c12, c21)
//rt.BranchState("m", t.checkTab[name + "-" + name2] = t12
//rt.BranchLink("n1-n2", ..., ...) t.checkTab[name2 + "-" + name] = t21
}
// XXX state on S,C is controlled by M: node := &tNode{}
// rt.BranchState("s", cMS) node.net = xnet.NetTrace(t.network.Host(name), t.gotracer)
return nil t.nodeTab[name] = node
return node
}
// NewMaster creates new master on node name.
//
// The master will be accepting incoming connections at node:1.
// The node must be not yet existing and will be dedicated to the created master fully. XXX
//
// XXX error of creating py process?
func (t *TestCluster) NewMaster(name string) ITestMaster {
node := t.registerNewNode(name)
//... XXX
m := tNewMaster(t.name, ":1", node.net)
return m
} }
func (t *TestCluster) NewStorage(node string) ITestStorage { func (t *TestCluster) NewStorage(node string) ITestStorage {
...@@ -105,3 +180,55 @@ func (t *TestCluster) NewStorage(node string) ITestStorage { ...@@ -105,3 +180,55 @@ func (t *TestCluster) NewStorage(node string) ITestStorage {
func (t *TestCluster) NewClient(node string) ITestClient { func (t *TestCluster) NewClient(node string) ITestClient {
panic("TODO") panic("TODO")
} }
// test-wrapper around Storage - to automatically listen by address, not provided listener.
type tStorage struct {
*Storage
serveAddr string
}
func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, back storage.Backend) *tStorage {
return &tStorage{
Storage: NewStorage(clusterName, masterAddr, net, back),
serveAddr: serveAddr,
}
}
func (s *tStorage) Run(ctx context.Context) error {
l, err := s.node.Net.Listen(s.serveAddr)
if err != nil {
return err
}
return s.Storage.Run(ctx, l)
}
// test-wrapper around Master
//
// - automatically listens by address, not provided listener.
// - uses virtual clock.
type tMaster struct {
*Master
serveAddr string
vclock vclock
}
func tNewMaster(clusterName, serveAddr string, net xnet.Networker) *tMaster {
m := &tMaster{
Master: NewMaster(clusterName, net),
serveAddr: serveAddr,
}
m.Master.monotime = m.vclock.monotime
return m
}
func (m *tMaster) Run(ctx context.Context) error {
l, err := m.node.Net.Listen(m.serveAddr)
if err != nil {
return err
}
return m.Master.Run(ctx, l)
}
...@@ -338,9 +338,12 @@ func (r *EventRouter) BranchState(host string, dst *tracetest.SyncChan) { ...@@ -338,9 +338,12 @@ func (r *EventRouter) BranchState(host string, dst *tracetest.SyncChan) {
// BranchLink branches events corresponding to link in between a-b. // BranchLink branches events corresponding to link in between a-b.
// //
// Link should be of "a-b" form with b listening and a dialing. // Link should be of "a-b" form with b listening and a dialing.
// XXX do we need to require that b listens / a dials?
// //
// Event with networking cause root coming from a go to dsta, and with // Event with networking cause root coming from a go to dsta, and with
// networking cause root coming from b - go to dstb. // networking cause root coming from b - go to dstb.
//
// XXX extend to support multiple ongoing streams (e.g. prefetch) ?
func (r *EventRouter) BranchLink(link string, dsta, dstb *tracetest.SyncChan) { func (r *EventRouter) BranchLink(link string, dsta, dstb *tracetest.SyncChan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment