Commit fd1140a6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9d3a80ae
......@@ -37,10 +37,10 @@ import (
//
// UUID -> *Node
//
// mapping associating node uuid with information about a node.
// mapping listing known nodes and associating their uuid with information
// about a node.
//
// Primary use-case is that master maintains such table and provides it to
// its peers to know each other:
// Master maintains such table and provides it to its peers to know each other:
//
// - Primary Master view of cluster.
// - M tracks changes to nodeTab as nodes appear (connected to M) and go (disconnected from M).
......
......@@ -74,17 +74,19 @@ type eventNeoSend struct {
// event: cluster state changed
type eventClusterState struct {
Ptr *neo.ClusterState // pointer to variable which holds the state
//Ptr *neo.ClusterState // pointer to variable which holds the state
Where string
State neo.ClusterState
}
func clusterState(cs *neo.ClusterState, v neo.ClusterState) *eventClusterState {
return &eventClusterState{cs, v}
func clusterState(where string, v neo.ClusterState) *eventClusterState {
return &eventClusterState{where, v}
}
// event: nodetab entry changed
type eventNodeTab struct {
NodeTab unsafe.Pointer // *neo.NodeTable XXX not to noise test diff
//NodeTab unsafe.Pointer // *neo.NodeTable XXX not to noise test diff
Where string // host of running node XXX ok? XXX -> TabName?
NodeInfo neo.NodeInfo
}
......@@ -147,20 +149,26 @@ func hostport(addr string) (host string, port string) {
return host, port
}
func (r *EventRouter) Route(event interface{}) *tsync.SyncChan {
func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) {
r.mu.Lock()
defer r.mu.Unlock()
switch ev := event.(type) {
case *eventNetListen:
host, _ := hostport(ev.Laddr)
dst := r.byNode[host]
if dst != nil {
return dst
}
dst = r.byNode[host]
case *eventNodeTab:
dst = r.byNode[ev.Where]
case *eventClusterState:
dst = r.byNode[ev.Where]
}
return r.defaultq // use default XXX or better nil?
if dst == nil {
dst = r.defaultq
}
return dst
}
func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) {
......@@ -180,10 +188,19 @@ func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) {
type TraceCollector struct {
pg *tracing.ProbeGroup
d *tsync.EventDispatcher
nodeTab2Owner map[*neo.NodeTable]string
clusterState2Owner map[*neo.ClusterState]string
}
func NewTraceCollector(dispatch *tsync.EventDispatcher) *TraceCollector {
return &TraceCollector{pg: &tracing.ProbeGroup{}, d: dispatch}
return &TraceCollector{
pg: &tracing.ProbeGroup{},
d: dispatch,
nodeTab2Owner: make(map[*neo.NodeTable]string),
clusterState2Owner: make(map[*neo.ClusterState]string),
}
}
//trace:import "lab.nexedi.com/kirr/neo/go/neo"
......@@ -203,6 +220,21 @@ func (t *TraceCollector) Detach() {
t.pg.Done()
}
// RegisterNode lets the tracer know ptr-to-node-state -> node name relation.
//
// This way it can translate e.g. *NodeTable -> owner node name when creating
// corresponding event.
func (t *TraceCollector) RegisterNode(node *neo.NodeApp, name string) {
tracing.Lock()
defer tracing.Unlock()
// XXX verify there is no duplicate names
// XXX verify the same pointer is not registerd twice
t.nodeTab2Owner[node.NodeTab] = name
t.clusterState2Owner[&node.ClusterState] = name
}
func (t *TraceCollector) TraceNetConnect(ev *xnet.TraceConnect) { t.d.Dispatch(ev) }
func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) {
......@@ -216,11 +248,15 @@ func (t *TraceCollector) traceNeoMsgSendPre(l *neo.NodeLink, connID uint32, msg
}
func (t *TraceCollector) traceClusterState(cs *neo.ClusterState) {
t.d.Dispatch(&eventClusterState{cs, *cs})
//t.d.Dispatch(&eventClusterState{cs, *cs})
where := t.clusterState2Owner[cs]
t.d.Dispatch(&eventClusterState{where, *cs})
}
func (t *TraceCollector) traceNode(nt *neo.NodeTable, n *neo.Node) {
t.d.Dispatch(&eventNodeTab{unsafe.Pointer(nt), n.NodeInfo})
//t.d.Dispatch(&eventNodeTab{unsafe.Pointer(nt), n.NodeInfo})
where := t.nodeTab2Owner[nt]
t.d.Dispatch(&eventNodeTab{where, n.NodeInfo})
}
func (t *TraceCollector) traceMasterStartReady(m *Master, ready bool) {
......@@ -297,9 +333,16 @@ func TestMasterStorage(t *testing.T) {
}
// shortcut for nodetab change
node := func(x *neo.NodeApp, laddr string, typ neo.NodeType, num int32, state neo.NodeState, idtime neo.IdTime) *eventNodeTab {
// node := func(x *neo.NodeApp, laddr string, typ neo.NodeType, num int32, state neo.NodeState, idtime neo.IdTime) *eventNodeTab {
// return &eventNodeTab{
// NodeTab: unsafe.Pointer(x.NodeTab),
// NodeInfo: nodei(laddr, typ, num, state, idtime),
// }
// }
node := func(where string, laddr string, typ neo.NodeType, num int32, state neo.NodeState, idtime neo.IdTime) *eventNodeTab {
return &eventNodeTab{
NodeTab: unsafe.Pointer(x.NodeTab),
Where: where,
NodeInfo: nodei(laddr, typ, num, state, idtime),
}
}
......@@ -316,11 +359,19 @@ func TestMasterStorage(t *testing.T) {
rt.BranchNode("m", cm)
rt.BranchNode("s", cs)
// cluster nodes
M := NewMaster("abc1", ":1", Mhost)
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zstor)
// register them
tracer.RegisterNode(M.node, "m") // XXX better Mhost.Name()
tracer.RegisterNode(S.node, "s")
gwg := &errgroup.Group{}
// start master
Mclock := &vclock{}
M := NewMaster("abc1", ":1", Mhost)
M.monotime = Mclock.monotime
Mctx, Mcancel := context.WithCancel(bg)
gox(gwg, func() {
......@@ -330,8 +381,6 @@ func TestMasterStorage(t *testing.T) {
})
// start storage
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zstor)
Sctx, Scancel := context.WithCancel(bg)
gox(gwg, func() {
err := S.Run(Sctx)
......@@ -342,8 +391,8 @@ func TestMasterStorage(t *testing.T) {
// M starts listening
tm.Expect(netlisten("m:1"))
tm.Expect(node(M.node, "m:1", neo.MASTER, 1, neo.RUNNING, neo.IdTimeNone))
tm.Expect(clusterState(&M.node.ClusterState, neo.ClusterRecovering))
tm.Expect(node("m", "m:1", neo.MASTER, 1, neo.RUNNING, neo.IdTimeNone))
tm.Expect(clusterState("m", neo.ClusterRecovering))
// TODO create C; C tries connect to master - rejected ("not yet operational")
......@@ -360,7 +409,7 @@ func TestMasterStorage(t *testing.T) {
IdTime: neo.IdTimeNone,
}))
g.Expect(node(M.node, "s:1", neo.STORAGE, 1, neo.PENDING, 0.01))
g.Expect(node("m", "s:1", neo.STORAGE, 1, neo.PENDING, 0.01))
g.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{
NodeType: neo.MASTER,
......
......@@ -109,6 +109,8 @@ func (m *SyncMsg) Ack() {
// NewSyncChan creates new SyncChan channel.
func NewSyncChan(name string) *SyncChan {
// XXX somehow avoid channels with duplicate names
// (only allow to create named channels from under dispatcher?)
return &SyncChan{msgq: make(chan *SyncMsg), name: name}
}
......@@ -232,8 +234,11 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
default:
}
// XXX panic triggering disabled because if sender panics we have no chance to continue
// TODO retest this
// in any case close channel where futer Sends may arrive so that will panic too.
close(dst.msgq)
//close(dst.msgq)
}
// order channels by name
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment