Commit 54137dd2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 030b0056
...@@ -1033,18 +1033,16 @@ func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) { ...@@ -1033,18 +1033,16 @@ func (m *Master) disconnectPeer(ctx context.Context, peer *_MasteredPeer) {
// updateNodeTab = .nodeTab.Update + send δnodeTab to all subscribers. // updateNodeTab = .nodeTab.Update + send δnodeTab to all subscribers.
// must be called from main. // must be called from main.
// XXX place
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode { func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode {
log.Infof(ctx, "nodeTab[%s] <- %s", nodeInfo.NID, nodeInfo)
node := m.node.State.NodeTab.Update(nodeInfo) node := m.node.State.NodeTab.Update(nodeInfo)
m.notifyAll(ctx, &_ΔNodeTab{nodeInfo}) m.notifyAll(ctx, &_ΔNodeTab{nodeInfo})
return node return node
} }
// XXX place // updateNodeState updates nodeTab[peer.nid].state = state
// XXX doc func (m *Master) updateNodeState(ctx context.Context, peer *xneo.PeerNode, state proto.NodeState) {
func (m *Master) updateNodeState(ctx context.Context, node *xneo.PeerNode, state proto.NodeState) { nodei := peer.NodeInfo
nodei := node.NodeInfo
// XXX skip if .State == state ?
nodei.State = state nodei.State = state
m.updateNodeTab(ctx, nodei) m.updateNodeTab(ctx, nodei)
} }
......
...@@ -69,7 +69,6 @@ type NodeTable struct { ...@@ -69,7 +69,6 @@ type NodeTable struct {
localNode *Node localNode *Node
nodev []*PeerNode // all nodes nodev []*PeerNode // all nodes
// notifyv []chan proto.NodeInfo // subscribers
} }
//trace:event traceNodeChanged(nt *NodeTable, n *PeerNode) //trace:event traceNodeChanged(nt *NodeTable, n *PeerNode)
...@@ -138,8 +137,6 @@ func (nt *NodeTable) Update(nodeInfo proto.NodeInfo) *PeerNode { ...@@ -138,8 +137,6 @@ func (nt *NodeTable) Update(nodeInfo proto.NodeInfo) *PeerNode {
// XXX close link if .state becomes DOWN ? // XXX close link if .state becomes DOWN ?
traceNodeChanged(nt, node) traceNodeChanged(nt, node)
// nt.notify(node.NodeInfo) XXX kill
return node return node
} }
...@@ -156,16 +153,6 @@ func (nt *NodeTable) StorageList() []*PeerNode { ...@@ -156,16 +153,6 @@ func (nt *NodeTable) StorageList() []*PeerNode {
} }
/*
// XXX doc
func (n *PeerNode) SetState(state proto.NodeState) {
n.State = state
traceNodeChanged(n.nodeTab, n)
n.nodeTab.notify(n.NodeInfo)
}
*/
func (nt *NodeTable) String() string { func (nt *NodeTable) String() string {
buf := bytes.Buffer{} buf := bytes.Buffer{}
...@@ -179,90 +166,6 @@ func (nt *NodeTable) String() string { ...@@ -179,90 +166,6 @@ func (nt *NodeTable) String() string {
return buf.String() return buf.String()
} }
/*
// ---- subscription to nodetab updates ----
// XXX used only by M -> move into M?
// notify notifies NodeTable subscribers that nodeInfo was updated
func (nt *NodeTable) notify(nodeInfo proto.NodeInfo) {
// XXX rlock for .notifyv ?
for _, notify := range nt.notifyv {
notify <- nodeInfo
}
}
// Subscribe subscribes to NodeTable updates.
//
// It returns a channel via which updates will be delivered and function to unsubscribe.
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) Subscribe() (ch chan proto.NodeInfo, unsubscribe func()) {
ch = make(chan proto.NodeInfo) // XXX how to specify ch buf size if needed ?
nt.notifyv = append(nt.notifyv, ch)
unsubscribe = func() {
for i, c := range nt.notifyv {
if c == ch {
nt.notifyv = append(nt.notifyv[:i], nt.notifyv[i+1:]...)
close(ch)
return
}
}
panic("XXX unsubscribe not subscribed channel")
}
return ch, unsubscribe
}
// SubscribeBuffered subscribes to NodeTable updates without blocking updater.
//
// It returns a channel via which updates are delivered and unsubscribe function.
// The updates will be sent to destination in non-blocking way - if destination
// channel is not ready they will be buffered.
// It is the caller responsibility to make sure such buffering does not grow up
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown.
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe func()) {
in, unsubscribe := nt.Subscribe()
ch = make(chan []proto.NodeInfo)
go func() {
var updatev []proto.NodeInfo
shutdown := false
for {
out := ch
if len(updatev) == 0 {
if shutdown {
// nothing to send and source channel closed
// -> close destination and stop
close(ch)
break
}
out = nil
}
select {
case update, ok := <-in:
if !ok {
shutdown = true
break
}
// FIXME merge updates as same node could be updated several times
updatev = append(updatev, update)
case out <- updatev:
updatev = nil
}
}
}()
return ch, unsubscribe
}
*/
// ---- peer link ---- // ---- peer link ----
// TODO review peer link dialing / setting / accepting. // TODO review peer link dialing / setting / accepting.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment