Commit bce3fadc authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c9d2d4e8
...@@ -109,6 +109,8 @@ func masterStartReady(where string, ready bool) *eventMStartReady { ...@@ -109,6 +109,8 @@ func masterStartReady(where string, ready bool) *eventMStartReady {
return &eventMStartReady{where, ready} return &eventMStartReady{where, ready}
} }
// TODO eventPartTab
// ---- events routing ---- // ---- events routing ----
// EventRouter implements NEO-specific routing of events to trace test channels. // EventRouter implements NEO-specific routing of events to trace test channels.
...@@ -117,9 +119,14 @@ type EventRouter struct { ...@@ -117,9 +119,14 @@ type EventRouter struct {
defaultq *tsync.SyncChan defaultq *tsync.SyncChan
// events specific to particular node - e.g. node starts listening // events specific to particular node - e.g. node starts listening,
// state on that node changes, etc...
byNode map[string /*host*/]*tsync.SyncChan byNode map[string /*host*/]*tsync.SyncChan
// state on host changes. Takes precendece over byNode.
byState map[string /*host*/]*tsync.SyncChan
// event on a-b link
byLink map[string /*host-host*/]*linkDst byLink map[string /*host-host*/]*linkDst
connected map[string /*addr-addr*/]bool connected map[string /*addr-addr*/]bool
...@@ -129,6 +136,7 @@ func NewEventRouter() *EventRouter { ...@@ -129,6 +136,7 @@ func NewEventRouter() *EventRouter {
return &EventRouter{ return &EventRouter{
defaultq: tsync.NewSyncChan("default"), defaultq: tsync.NewSyncChan("default"),
byNode: make(map[string]*tsync.SyncChan), byNode: make(map[string]*tsync.SyncChan),
byState: make(map[string]*tsync.SyncChan),
byLink: make(map[string]*linkDst), byLink: make(map[string]*linkDst),
connected: make(map[string]bool), connected: make(map[string]bool),
} }
...@@ -140,6 +148,9 @@ func (r *EventRouter) AllRoutes() []*tsync.SyncChan { ...@@ -140,6 +148,9 @@ func (r *EventRouter) AllRoutes() []*tsync.SyncChan {
for _, dst := range r.byNode { for _, dst := range r.byNode {
rtset[dst] = 1 rtset[dst] = 1
} }
for _, dst := range r.byState {
rtset[dst] = 1
}
for _, ldst := range r.byLink { for _, ldst := range r.byLink {
rtset[ldst.a] = 1 rtset[ldst.a] = 1
rtset[ldst.b] = 1 rtset[ldst.b] = 1
...@@ -232,13 +243,13 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) { ...@@ -232,13 +243,13 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) {
// state changes // state changes
case *eventNodeTab: case *eventNodeTab:
dst = r.byNode[ev.Where] dst = r.routeState(ev.Where)
case *eventClusterState: case *eventClusterState:
dst = r.byNode[ev.Where] dst = r.routeState(ev.Where)
case *eventMStartReady: case *eventMStartReady:
dst = r.byNode[ev.Where] dst = r.routeState(ev.Where)
} }
if dst == nil { if dst == nil {
...@@ -247,7 +258,19 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) { ...@@ -247,7 +258,19 @@ func (r *EventRouter) Route(event interface{}) (dst *tsync.SyncChan) {
return dst return dst
} }
// BranchNode branches events corresponsing to state on host. // routeState routes event corresponding to state change on host
func (r *EventRouter) routeState(host string) (dst *tsync.SyncChan) {
// lookup dst by state rules
dst = r.byState[host]
if dst != nil {
return dst
}
// fallback to by node rules
return r.byNode[host]
}
// BranchNode branches events corresponding to host.
func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) { func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -259,11 +282,23 @@ func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) { ...@@ -259,11 +282,23 @@ func (r *EventRouter) BranchNode(host string, dst *tsync.SyncChan) {
r.byNode[host] = dst r.byNode[host] = dst
} }
// BranchState branches events corresponding to state changes on host.
func (r *EventRouter) BranchState(host string, dst *tsync.SyncChan) {
r.mu.Lock()
defer r.mu.Unlock()
if _, already := r.byState[host]; already {
panic(fmt.Sprintf("event router: state on node %q already branched", host))
}
r.byState[host] = dst
}
// BranchLink branches events corresponding to link in between a-b. // BranchLink branches events corresponding to link in between a-b.
// //
// Link should be of "a-b" form with b listening and a dialing. // Link should be of "a-b" form with b listening and a dialing.
// //
// Event with networkiing cause root coming from a go to dsta, and with // Event with networking cause root coming from a go to dsta, and with
// networking cause root coming from b - go to dstb. // networking cause root coming from b - go to dstb.
func (r *EventRouter) BranchLink(link string, dsta, dstb *tsync.SyncChan) { func (r *EventRouter) BranchLink(link string, dsta, dstb *tsync.SyncChan) {
r.mu.Lock() r.mu.Lock()
...@@ -467,11 +502,11 @@ func TestMasterStorage(t *testing.T) { ...@@ -467,11 +502,11 @@ func TestMasterStorage(t *testing.T) {
// XXX C-S // XXX C-S
rt.BranchNode("m", cM) rt.BranchNode("m", cM)
rt.BranchNode("s", cS)
rt.BranchLink("s-m", cSM, cMS) rt.BranchLink("s-m", cSM, cMS)
rt.BranchLink("c-m", cCM, cMC) rt.BranchLink("c-m", cCM, cMC)
//rt.BranchNode("s", cMS) // state on S is controlled by M rt.BranchState("s", cMS) // state on S is controlled by M
rt.BranchNode("s", cS) // XXX <- no rt.BranchState("c", cMC) // state on C is controlled by M
rt.BranchNode("c", cMC) // state on C is controlled by M
// cluster nodes // cluster nodes
M := NewMaster("abc1", ":1", Mhost) M := NewMaster("abc1", ":1", Mhost)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment