Commit 7984e2ba authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 58455af2
...@@ -110,9 +110,6 @@ type nodeLeave struct { ...@@ -110,9 +110,6 @@ type nodeLeave struct {
} }
// _MasteredPeer represents context for all tasks related to one peer driven by master. // _MasteredPeer represents context for all tasks related to one peer driven by master.
//
// .notify
// .wait (run under mainWG)
type _MasteredPeer struct { type _MasteredPeer struct {
node *xneo.PeerNode node *xneo.PeerNode
...@@ -122,6 +119,9 @@ type _MasteredPeer struct { ...@@ -122,6 +119,9 @@ type _MasteredPeer struct {
wg *xsync.WorkGroup wg *xsync.WorkGroup
cancel func() cancel func()
idReq *neonet.Request // peer's original identification request
acceptMsg *proto.AcceptIdentification // how we decided to accept it
// snapshot of nodeTab/partTab/stateCode when peer was accepted by main. // snapshot of nodeTab/partTab/stateCode when peer was accepted by main.
state0 *xneo.ClusterStateSnapshot state0 *xneo.ClusterStateSnapshot
// main -> peerWG.notify δnodeTab/δpartTab/δstateCode. // main -> peerWG.notify δnodeTab/δpartTab/δstateCode.
...@@ -1169,14 +1169,15 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1169,14 +1169,15 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
// create peer with nodeTab/partTab snapshot to push to accepted node // create peer with nodeTab/partTab snapshot to push to accepted node
// and subscribe it for updates. // and subscribe it for updates.
peerCtx, peerCancel := context.WithCancel(m.runCtx) peerCtx, peerCancel := context.WithCancel(m.runCtx)
// XXX add accept.NID to peerCtx task?
peer = &_MasteredPeer{ peer = &_MasteredPeer{
node: node, node: node,
wg: xsync.NewWorkGroup(peerCtx), wg: xsync.NewWorkGroup(peerCtx),
cancel: peerCancel, cancel: peerCancel,
state0: m.node.State.Snapshot(), // XXX don't need .state0 if vvv is not moved to .acceptPeer idReq: n.req,
acceptMsg: accept,
state0: m.node.State.Snapshot(),
// TODO change limiting by buffer size -> to limiting by time // TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details) // (see notifyAll for details)
notifyq: make(chan _ΔClusterState, 1024), notifyq: make(chan _ΔClusterState, 1024),
notifyqOverflow: make(chan struct{}), notifyqOverflow: make(chan struct{}),
acceptSent: make(chan struct{}), acceptSent: make(chan struct{}),
...@@ -1184,7 +1185,6 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1184,7 +1185,6 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
m.peerTab[node.NID] = peer m.peerTab[node.NID] = peer
// spawn task to send accept and proxy δnodeTab/δpartTab to the peer // spawn task to send accept and proxy δnodeTab/δpartTab to the peer
// XXX -> func m.acceptPeer ?
peer.wg.Go(func(ctx context.Context) error { peer.wg.Go(func(ctx context.Context) error {
// go main <- peer "peer (should be) disconnected" when all peer's task finish // go main <- peer "peer (should be) disconnected" when all peer's task finish
m.mainWG.Go(func(ctx context.Context) error { m.mainWG.Go(func(ctx context.Context) error {
...@@ -1204,36 +1204,11 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1204,36 +1204,11 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
} }
}) })
// XXX err -> indicated that accept0 failed ? // send accept and indicate to run that initial acceptance is done
err := peer.accept(ctx)
// XXX close link on ctx cancel?
link := peer.node.Link()
// send acceptance to just identified peer
err := n.req.Reply(accept)
if err != nil {
return fmt.Errorf("send accept: %w", err)
}
// send initial state snapshot to accepted node
// nodeTab
err = link.Send1(&peer.state0.NodeTab)
if err != nil {
return fmt.Errorf("send nodeTab: %w", err)
}
// partTab (not to S until cluster is RUNNING)
if !(peer.node.Type == proto.STORAGE && peer.state0.Code != proto.ClusterRunning) {
err = link.Send1(&peer.state0.PartTab)
if err != nil { if err != nil {
return fmt.Errorf("send partTab: %w", err) return nil
}
} }
// XXX send clusterState too? (NEO/py does not send it)
// indicate to run that initial acceptance is done
close(peer.acceptSent) close(peer.acceptSent)
// proxy δnodeTab,δpartTab/δclusterState from main to the peer // proxy δnodeTab,δpartTab/δclusterState from main to the peer
...@@ -1243,7 +1218,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer, ...@@ -1243,7 +1218,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
return peer, true return peer, true
} }
// XXX run runs f after initial phase of peer acceptance is over. // run runs f after initial phase of peer acceptance is over.
// //
// XXX this is very similar if a separate Accept call would return peers // XXX this is very similar if a separate Accept call would return peers
// already identified and answered with initial accept message sequence. // already identified and answered with initial accept message sequence.
...@@ -1258,14 +1233,45 @@ func (p *_MasteredPeer) run(ctx context.Context, f func() error) error { ...@@ -1258,14 +1233,45 @@ func (p *_MasteredPeer) run(ctx context.Context, f func() error) error {
// XXX in general we should also wait for if "accept0 failed". However // XXX in general we should also wait for if "accept0 failed". However
// as that means accept0 task error, it would cancel ctx for all other // as that means accept0 task error, it would cancel ctx for all other
// tasks run through p.wg . And run is called with contexts whose // tasks run through p.wg . And run is called with contexts whose
// cancel is derived from wg cancel - so we don't check for that. XXX // cancel is derived from wg cancel - so we don't check for that.
case <-p.acceptSent: case <-p.acceptSent:
return f() return f()
} }
} }
// accept sends accept reply and pushes inital state0 snapshot to peer.
func (p *_MasteredPeer) accept(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "accept %s", p.node.NID)(&err)
link := p.node.Link()
// send acceptance to just identified peer
err = p.idReq.Reply(p.acceptMsg)
if err != nil {
return fmt.Errorf("send accept: %w", err)
}
// send initial state snapshot to accepted node
// nodeTab
err = link.Send1(&p.state0.NodeTab)
if err != nil {
return fmt.Errorf("send nodeTab: %w", err)
}
// partTab (not to S until cluster is RUNNING)
if !(p.node.Type == proto.STORAGE && p.state0.Code != proto.ClusterRunning) {
err = link.Send1(&p.state0.PartTab)
if err != nil {
return fmt.Errorf("send partTab: %w", err)
}
}
// XXX send clusterState too? (NEO/py does not send it)
return nil
}
// notify proxies δnodeTab/δpartTab/δClusterState update to the peer. // notify proxies δnodeTab/δpartTab/δClusterState update to the peer.
// XXX merge into m.acceptPeer ?
func (p *_MasteredPeer) notify(ctx context.Context) (err error) { func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "notify %s", p.node.NID)(&err) defer task.Runningf(&ctx, "notify %s", p.node.NID)(&err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment