Commit 71f7d77e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 242c3d85
......@@ -53,6 +53,8 @@ type Master struct {
ctlStop chan ctlStop // request to stop cluster
ctlShutdown chan chan error // request to shutdown cluster XXX with ctx too ?
wantToStart chan chan error // main -> recovery
// channels from various workers to main driver
nodeCome chan nodeCome // node connected
nodeLeave chan nodeLeave // node disconnected
......@@ -95,23 +97,68 @@ func NewMaster(clusterName string) *Master {
// XXX NotifyNodeInformation to all nodes whenever nodetab changes
// XXX -> Start(), Stop()
/*
func (m *Master) SetClusterState(state ClusterState) error {
// Start requests cluster to eventually transition into running state
// it returns an error if such transition is not currently possible (e.g. partition table is not operational)
// it returns nil if the transition began.
// NOTE upon successfull return cluster is not yet in running state - the transition will
// take time and could be also automatically aborted due to cluster environment change (e.g.
// a storage node goes down)
func (m *Master) Start() error {
ch := make(chan error)
m.ctlState <- ctlState{state, ch}
m.ctlStart <- ctlStart{ch}
return <-ch
}
*/
// run implements main master cluster management logic: node tracking, cluster XXX -> only top-level
// Stop requests cluster to eventually transition into recovery state
// XXX should be always possible ?
func (m *Master) Stop() error {
ch := make(chan error)
m.ctlStop <- ctlStop{ch}
return <-ch
}
// Shutdown requests all known nodes in the cluster to stop
func (m *Master) Shutdown() error {
panic("TODO")
}
func (m *Master) setClusterState(state ClusterState) {
m.clusterState = state
// TODO notify subscribers
}
func (m *Master) xxx(ctx ...) {
var err error
for ctx.Err() == nil {
err = recovery(ctx)
if err != nil {
return // XXX
}
// successful recovery -> verify
err = verify(ctx)
if err != nil {
continue // -> recovery
}
// successful verify -> service
err = service(ctx)
if err != nil {
// XXX what about shutdown ?
continue // -> recovery
}
}
}
// run implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc
func (m *Master) run(ctx context.Context) {
// // current function to ask/control a storage depending on current cluster state
// // + associated context covering all storage nodes
// // XXX + waitgroup ?
// storCtl := m.storCtlRecovery
// storCtlCtx, storCtlCancel := context.WithCancel(ctx)
go m.recovery(ctx)
for {
select {
......@@ -128,18 +175,24 @@ func (m *Master) run(ctx context.Context) {
break
}
// check preconditions for start
if !m.partTab.OperationalWith(&m.nodeTab) {
// XXX err ctx
// TODO + how much % PT is covered
c.resp <- fmt.Errorf("start: non-operational partition table")
ch := make(chan error)
select {
case <-ctx.Done():
// XXX how to avoid checking this ctx.Done everywhere?
c.resp <- ctx.Err()
panic("TODO")
case m.wantToStart <- ch:
}
err := <-ch
c.resp <- err
if err != nil {
break
}
// XXX cancel/stop/wait for current recovery task
// XXX start starting task
// recovery said it is ok to start and finished - launch verification
m.setClusterState(ClusterVerifying)
go m.verify(ctx)
// command to stop cluster
case <-m.ctlStop:
......@@ -147,40 +200,7 @@ func (m *Master) run(ctx context.Context) {
// command to shutdown
case <-m.ctlShutdown:
/*
// node connects & requests identification
case n := <-m.nodeCome:
nodeInfo, ok := m.accept(n)
if !(ok && nodeInfo.NodeType == STORAGE) {
break
}
// new storage node joined cluster
// XXX consider .clusterState change
// launch current storage control work on the joined node
go storCtl(storCtlCtx, n.link)
// TODO consider adjusting partTab
// node disconnects
case _ = <-m.nodeLeave:
// TODO
// a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there
case r := <-m.storRecovery:
if r.partTab.ptid > m.partTab.ptid {
m.partTab = r.partTab
// XXX also transfer subscribers ?
// XXX -> during recovery no one must be subscribed to partTab
}
// XXX consider clusterState change
*/
}
}
......@@ -205,7 +225,7 @@ func (m *Master) recovery(ctx context.Context) {
defer rcancel()
inprogress := 0
// start recovery on all storages we are currently in touch
// start recovery on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() {
if stor.Info.NodeState > DOWN { // XXX state cmp ok ?
inprogress++
......@@ -216,10 +236,6 @@ func (m *Master) recovery(ctx context.Context) {
loop:
for {
select {
case <-ctx.Done():
// XXX
break loop
case n := <-m.nodeCome:
node, ok := m.accept(n, /* XXX do not accept clients */)
if !ok {
......@@ -234,7 +250,8 @@ loop:
m.nodeTab.UpdateLinkDown(n.link)
// XXX update something indicating cluster currently can be operational or not ?
// result of a storage recovery
// a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there
case r := <-recovery:
inprogress--
......@@ -254,9 +271,11 @@ loop:
// XXX update something indicating cluster currently can be operational or not ?
// request from master: request "ok to start?" - if ok we reply ok and exit
// request from master: "I want to start - ok?" - if ok we reply ok and exit
// if not ok - we just reply not ok
case s := <-...:
//case s := <-m.wantToStart:
case c := <-m.ctlStart:
if m.partTab.OperationalWith(&m.nodeTab) {
// reply "ok to start" after whole recovery finishes
......@@ -267,12 +286,19 @@ loop:
rcancel()
defer func() {
s.resp <- nil
s <- nil
}()
break loop
}
s.resp <- fmt.Errorf("cluster is non-operational")
s <- fmt.Errorf("start: cluster is non-operational")
case c := <-m.ctlStop:
c.resp <- nil // we are already recovering
case <-ctx.Done():
// XXX
break loop
}
}
......@@ -363,9 +389,7 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery)
// verify is a process that drives cluster via verification phase
//
// prerequisite for start: .partTab is operational wrt .nodeTab
//
// XXX draft: Cluster Verify if []Stor is fixed
func (m *Master) verify(ctx context.Context, storv []*NodeLink) error {
func (m *Master) verify(ctx context.Context) error { //, storv []*NodeLink) error {
// XXX ask every storage for verify and wait for _all_ them to complete?
var err error
......@@ -376,17 +400,20 @@ func (m *Master) verify(ctx context.Context, storv []*NodeLink) error {
// XXX do we need to reset m.lastOid / m.lastTid to 0 in the beginning?
for _, stor := range storv {
// start verification on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() {
inprogress++
go storCtlVerify(vctx, stor, verify)
go storCtlVerify(vctx, stor.Link, verify)
}
loop:
for inprogress > 0 {
select {
case <-ctx.Done():
err = ctx.Err()
break loop
case n := <-m.nodeCome:
// TODO
case n := <-m.nodeLeave:
// TODO
case v := <-verify:
inprogress--
......@@ -410,6 +437,19 @@ loop:
m.lastTid = v.lastTid
}
}
case c := <-m.ctlStart:
c.resp <- nil // we are already starting
case c := <-m.ctlStop:
c.resp <- nil // ok
err = fmt.Errorf("stop requested")
break loop
case <-ctx.Done():
err = ctx.Err()
break loop
}
}
......@@ -484,14 +524,39 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
// service is the process that drives cluster during running state
//
// XXX draft: Cluster Running if []Stor is fixed
func (m *Master) service(ctx context.Context, storv []*NodeLink) {
func (m *Master) service(ctx context.Context) {
loop:
for {
select {
case n := <-m.nodeCome:
// TODO
}
// XXX draft: Cluster Stopping if []Stor is fixed
func (m *Master) stop(ctx context.Context, storv []*NodeLink) {
case n := <-m.nodeLeave:
// TODO
// XXX what else ? (-> txn control at least)
case c := <-m.ctlStart:
c.resp <- nil // we are already started
case c := <-m.ctlStop:
c.resp <- nil // ok
err = fmt.Errorf("stop requested")
break loop
case <-ctx.Done():
err = ctx.Err()
break loop
}
}
if err != nil {
// TODO
}
return err
}
// accept processes identification request of just connected node and either accepts or declines it
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment