Commit 0e4fc73e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0beb359a
......@@ -330,9 +330,8 @@ func TestMasterStorage(t *testing.T) {
// TODO M.Stop() while verify
// verification ok; M start service
// TODO
tc.Expect(clusterState(&M.clusterState, neo.ClusterRunning))
// expect:
// M.clusterState <- RUNNING + TODO it should be sent to S
// TODO S leave while service
......
......@@ -321,7 +321,7 @@ loop:
if node == nil {
goreject(ctx, wg, n.conn, resp)
return
break
}
// if new storage arrived - start recovery on it too
......@@ -585,7 +585,7 @@ loop:
if node == nil {
goreject(ctx, wg, n.conn, resp)
return
break
}
// new storage arrived - start verification on it too
......@@ -664,13 +664,6 @@ loop:
}
}
/*
// consume left verify responses (which should come without delay since it was cancelled)
for ; inprogress > 0; inprogress-- {
<-verify
}
*/
// wait all workers to finish (which should come without delay since it was cancelled)
done := make(chan struct{})
go func() {
......@@ -756,13 +749,13 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
}
// Cluster Running
// ---------------
// Cluster Running (operational service)
// -------------------------------------
//
// - starts with operational parttab and all present storage nodes consistently
// either finished or rolled-back partly finished transactions
// - monitor storages come & go and if parttab becomes non-operational leave to recovery
// - provide service to clients while we are here
// either finished or rolled-back partly finished transactions.
// - monitor storages come & go and if parttab becomes non-operational leave to recovery.
// - provide service to clients while we are here.
//
// TODO also plan data movement on new storage nodes appearing
......@@ -774,125 +767,55 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
func (m *Master) service(ctx context.Context) (err error) {
defer running(&ctx, "service")(&err)
// XXX we also need to tell storages StartOperation first
m.setClusterState(neo.ClusterRunning)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// XXX spawn per-storage driver about nodetab
wg := &sync.WaitGroup{}
// spawn per-storage service driver
for _, stor := range m.nodeTab.StorageList() {
if stor.State == neo.RUNNING { // XXX note PENDING - not adding to service; ok?
wg.Add(1)
go func() {
defer wg.Done()
storCtlService(ctx, stor, service)
}()
}
}
loop:
for {
select {
// a node connected and requests identification
// new connection comes in
case n := <-m.nodeCome:
node, resp := m.identify(ctx, n, /* XXX accept everyone */)
//state := m.clusterState
_ = node
_ = resp
/*
if node == nil {
goreject(ctx, wg, n.conn, resp)
break
}
wg.Add(1)
go func() {
defer wg.Done()
if !ok {
reject(n.conn, resp)
return
}
err = accept(n.conn, resp)
if err {
// XXX
if err != nil {
service <- nodeService{node: node, err: err}
return
}
switch node.Type {
case STORAGE:
switch state {
case ClusterRecovery:
storCtlRecovery(xxx, node, recovery)
case neo.STORAGE:
storCtlService(ctx, node, service)
case ClusterVerifying, ClusterRunning:
storCtlVerify(xxx, node, verify)
//case neo.CLIENT:
// serveClient(ctx, node, service)
// XXX ClusterStopping
}
case CLIENT:
// TODO
// XXX ADMIN
}
}()
*/
/*
// a storage node came through recovery - if we are still recovering let's see whether
// ptid ↑ and if so we should take partition table from there
case r := <-recovery:
if m.ClusterState == ClusterRecovering {
if r.err != nil {
// XXX err ctx?
// XXX log here or in producer?
m.logf("%v", r.err)
// TODO close stor link / update .nodeTab
break
}
// we are interested in latest partTab
// NOTE during recovery no one must be subscribed to
// partTab so it is ok to simply change whole m.partTab
if r.partTab.PTid > m.partTab.PTid {
m.partTab = r.partTab
}
// XXX handle case of new cluster - when no storage reports valid parttab
// XXX -> create new parttab
// XXX update something indicating cluster currently can be operational or not ?
}
// proceed to storage verification if we want the
// storage to eventually join operations
switch m.ClusterState {
case ClusterVerifying:
case ClusterRunning:
wg.Add(1)
go func() {
defer wg.Done()
storCtlVerify(xxx, r.node, verify)
}()
// XXX note e.g. ClusterStopping - not starting anything on the storage
}
// a storage node came through verification - adjust our last{Oid,Tid} if ok
// on error check - whether cluster became non-operational and reset to recovery if so
// XXX was "stop verification if so"
case v := <-verify:
if v.err != nil {
m.logf("verify: %v", v.err)
// mark storage as non-working in nodeTab
// FIXME better -> v.node.setState(DOWN) ?
//m.nodeTab.UpdateLinkDown(v.link)
m.nodeTab.UpdateDown(v.node)
// check partTab is still operational
// if not -> cancel to go back to recovery
if !m.partTab.OperationalWith(&m.nodeTab) {
vcancel()
err = errClusterDegraded
break loop
}
} else {
if v.lastOid > m.lastOid {
m.lastOid = v.lastOid
}
if v.lastTid > m.lastTid {
m.lastTid = v.lastTid
}
}
// XXX if m.partTab.OperationalWith(&.nodeTab, RUNNING) -> break (ok)
*/
case n := <-m.nodeLeave:
m.nodeTab.SetNodeState(n.node, neo.DOWN)
......@@ -906,33 +829,8 @@ loop:
// XXX what else ? (-> txn control at least)
/*
case ech := <-m.ctlStart:
switch m.clusterState {
case neo.ClusterVerifying, neo.ClusterRunning:
ech <- nil // we are already started
case neo.ClusterRecovering:
if m.partTab.OperationalWith(&m.nodeTab) {
// reply "ok to start" after whole recovery finishes
// XXX ok? we want to retrieve all recovery information first?
// XXX or initially S is in PENDING state and
// transitions to RUNNING only after successful recovery?
rcancel()
defer func() {
ech <- nil
}()
break loop
}
ech <- fmt.Errorf("start: cluster is non-operational")
// XXX case ClusterStopping:
}
*/
ech <- nil // we are already started
case ech := <-m.ctlStop:
close(ech) // ok
......@@ -949,6 +847,26 @@ loop:
return err
}
// storCtlService drives a storage node during cluster service state
// XXX text
func storCtlService(ctx context.Context, stor *neo.Node, srv chan serviceXXX) {
/*
var err error
defer func() {
if err == nil {
return
}
// on error provide feedback to main driver
*/
conn := stor.Conn
ready := neo.NotifyReady{}
err = conn.Ask(&neo.StartOperation{Backup: false}, &ready)
if err != nil {
// XXX ...
}
// identify processes identification request of just connected node and either accepts or declines it.
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
// Response message is constructed but not send back not to block the caller - it is
......@@ -1074,315 +992,3 @@ func (m *Master) allocUUID(nodeType neo.NodeType) neo.NodeUUID {
panic("all uuid allocated ???") // XXX more robust ?
}
/* XXX goes away
// ServeLink serves incoming node-node link connection
// XXX +error return?
func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
logf := func(format string, argv ...interface{}) {
m.logf("%s: " + format + "\n", append([]interface{}{link}, argv...)...)
}
logf("serving new node")
// close link when either cancelling or returning (e.g. due to an error)
// ( when cancelling - link.Close will signal to all current IO to
// terminate with an error )
// XXX dup -> utility
retch := make(chan struct{})
defer func() { close(retch) }()
go func() {
select {
case <-ctx.Done():
// XXX tell peers we are shutting down?
// XXX ret err = ctx.Err()
case <-retch:
}
logf("closing link")
link.Close() // XXX err
}()
// identify peer
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
if err != nil {
logf("identify: %v", err)
return
}
idReq := &neo.RequestIdentification{}
_, err = conn.Expect(idReq)
if err != nil {
logf("identify: %v", err)
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
err = conn.Send(&neo.Error{neo.PROTOCOL_ERROR, err.Error()})
if err != nil {
logf("failed to send error: %v", err)
}
return
}
// convey identification request to master and we are done here - the master takes on the torch
m.nodeCome <- nodeCome{conn, idReq, nilXXX kill}
//////////////////////////////////////////////////
// if master accepted this node - don't forget to notify when it leaves
_, rejected := idResp.(error)
if !rejected {
defer func() {
m.nodeLeave <- nodeLeave{link}
}()
}
// let the peer know identification result
err = conn.Send(idResp)
if err != nil {
return
}
// nothing to do more here if identification was not accepted
if rejected {
logf("identify: %v", idResp)
return
}
logf("identify: accepted")
// FIXME vvv must be notified only after recovering is done
// (while recovering is in progress we must _not_ send partition table updates to S)
// XXX on successful identification master should also give us:
// - full snapshots of nodeTab, partTab and clusterState
// - buffered notification channel subscribed to changes of ^^^
// - unsubscribe func XXX needed? -> nodeLeave is enough
// ----------------------------------------
// XXX recheck vvv
// XXX temp hack
connNotify := conn
// subscribe to nodeTab/partTab/clusterState and notify peer with updates
m.stateMu.Lock()
nodeCh, nodeUnsubscribe := m.nodeTab.SubscribeBuffered()
_ = nodeUnsubscribe
//partCh, partUnsubscribe := m.partTab.SubscribeBuffered()
// TODO cluster subscribe
//clusterCh := make(chan ClusterState)
//m.clusterNotifyv = append(m.clusterNotifyv, clusterCh)
// NotifyPartitionTable PM -> S, C
// PartitionChanges PM -> S, C // subset of NotifyPartitionTable (?)
// NotifyNodeIntormation PM -> *
// TODO read initial nodeTab/partTab while still under lock
// TODO send later this initial content to peer
// TODO notify about cluster state changes
// ClusterInformation (PM -> * ?)
m.stateMu.Unlock()
go func() {
var msg neo.Msg
for {
select {
case <-ctx.Done():
// TODO unsubscribe
// XXX we are not draining on cancel - how to free internal buffer ?
return
case nodeUpdateV := <-nodeCh:
msg = &neo.NotifyNodeInformation{
IdTimestamp: math.NaN(), // XXX
NodeList: nodeUpdateV,
}
//case clusterState = <-clusterCh:
// changed = true
}
err = connNotify.Send(msg)
if err != nil {
// XXX err
}
}
}()
// identification passed, now serve other requests
// client: notify + serve requests
m.ServeClient(ctx, link)
// storage:
m.DriveStorage(ctx, link)
/////////////////
}
*/
// ServeClient serves incoming connection on which peer identified itself as client
// XXX +error return?
//func (m *Master) ServeClient(ctx context.Context, conn *neo.Conn) {
func (m *Master) ServeClient(ctx context.Context, link *neo.NodeLink) {
// TODO
}
// ---- internal requests for storage driver ----
// XXX goes away
/*
// storageRecovery asks storage driver to extract cluster recovery information from storage
type storageRecovery struct {
resp chan PartitionTable // XXX +err ?
}
// storageVerify asks storage driver to perform verification (i.e. "data recovery") operation
type storageVerify struct {
// XXX what is result ?
}
// storageStartOperation asks storage driver to start storage node operating
type storageStartOperation struct {
resp chan error // XXX
}
// storageStopOperation asks storage driver to stop storage node operating
type storageStopOperation struct {
resp chan error
}
*/
// DriveStorage serves incoming connection on which peer identified itself as storage
//
// There are 2 connections:
// - notifications: unidirectional M -> S notifications (nodes, parttab, cluster state)
// - control: bidirectional M <-> S
//
// In control communication master always drives the exchange - talking first
// with e.g. a command or request and expects corresponding answer
//
// XXX +error return?
func (m *Master) DriveStorage(ctx context.Context, link *neo.NodeLink) {
// ? >UnfinishedTransactions
// ? <AnswerUnfinishedTransactions (none currently)
// TODO go for notify chan
for {
select {
case <-ctx.Done():
return // XXX recheck
// // request from master to do something
// case mreq := <-xxx:
// switch mreq := mreq.(type) {
// case storageRecovery:
// case storageVerify:
// // TODO
// case storageStartOperation:
// // XXX timeout ?
// // XXX -> chat2 ?
// err = neo.EncodeAndSend(conn, &StartOperation{Backup: false /* XXX hardcoded */})
// if err != nil {
// // XXX err
// }
// pkt, err := RecvAndDecode(conn)
// if err != nil {
// // XXX err
// }
// switch pkt := pkt.(type) {
// default:
// err = fmt.Errorf("unexpected answer: %T", pkt)
// case *NotifyReady:
// }
// // XXX better in m.nodeq ?
// mreq.resp <- err // XXX err ctx
// case storageStopOperation:
// // TODO
// }
}
}
// RECOVERY (master.recovery.RecoveryManager + master.handlers.identification.py)
// --------
// """
// Recover the status about the cluster. Obtain the last OID, the last
// TID, and the last Partition Table ID from storage nodes, then get
// back the latest partition table or make a new table from scratch,
// if this is the first time.
// A new primary master may also arise during this phase.
// """
//
// m.clusterState = Recovering
// m.partTab.clear()
//
// - wait for S nodes to connect and process recovery phases on them
// - if pt.filled() - we are starting an existing cluster
// - else if autostart and N(S, connected) >= min_autosart -> starting new cluster
// - (handle truncation if .trancate_tid is set)
//
// >Recovery
// <AnswerRecovery (ptid, backup_tid, truncate_tid)
//
// >PartitionTable
// <AnswerPartitionTable (ptid, []{pid, []cell}
//
// NOTE ^^^ need to collect PT from all storages and choose one with highest ptid
// NOTE same for backup_tid & truncate_tid
//
//
// # neoctl start
// # (via changing nodeTab and relying on broadcast distribution ?)
// >NotifyNodeInformation (S1.state=RUNNING)
// # S: "I was told I'm RUNNING" XXX ^^^ -> StartOperation
//
// # (via changing m.clusterState and relying on broadcast ?)
// >NotifyClusterInformation (cluster_state=VERIFYING)
//
// # (via changing partTab and relying on broadcast ?) -> no sends whole PT initially
// >NotifyPartitionTable (ptid=1, `node 0: S1, R`)
// # S saves PT info locally XXX -> after StartOperation ?
//
//
// VERIFICATION (master.verification.py)
// ------------
//
// # M asks about unfinished transactions
// >AskLockedTransactions
// <AnswerLockedTransactions {} ttid -> tid # in example we have empty
//
// >LastIDs
// <AnswerLastIDs (last_oid, last_tid)
//
// # (via changing m.clusterState and relying on broadcast ?)
// >NotifyClusterInformation (cluster_state=RUNNING) XXX -> StartOperation
//
// >StartOperation
// <NotifyReady
// XXX only here we can update nodeTab with S1.state=RUNNING
//
// ...
//
// StopOperation PM -> S
}
func (m *Master) ServeAdmin(ctx context.Context, conn *neo.Conn) {
// TODO
}
func (m *Master) ServeMaster(ctx context.Context, conn *neo.Conn) {
// TODO (for elections)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment