Commit e02a2e7f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e8ac7104
......@@ -292,10 +292,10 @@ Commit communications
for S' in storagesFor(ttid):
C -> S' AskStoreTransaction(ttid) # empty answer
for S'' in (storagesFor(oid) - storagesFor):
C -> S'' AskVoteTransaction(ttid) # empty answer
for S" in (storagesFor(oid) - storagesFor):
C -> S" AskVoteTransaction(ttid) # empty answer
- wait for all responses from S' and S'' ^^^
- wait for all responses from S' and S" ^^^
* tpc_finish
......@@ -310,6 +310,10 @@ C -> M AskFinishTransaction(ttid, cache_dict, checked_list)
C <- M AnswerTransactionFinished(ttid, tid)
for S in M.allStorages:
M -> S NotifyUnlockInformation(ttid)
* tpc_abort
......
......@@ -424,7 +424,7 @@ type FailedVote struct {
// Finish a transaction. C -> PM.
// Answer when a transaction is finished. PM -> C.
type FinishTransaction struct {
Tid zodb.Tid
Tid zodb.Tid // XXX this is ttid
OIDList []zodb.Oid
CheckedList []zodb.Oid
}
......
......@@ -384,6 +384,7 @@ loop:
// storRecovery is result of a storage node passing recovery phase
type storRecovery struct {
node *Node
partTab neo.PartitionTable
// XXX + backup_tid, truncate_tid ?
......@@ -578,9 +579,10 @@ loop:
// storVerify is result of a storage node passing verification phase
type storVerify struct {
node *Node
lastOid zodb.Oid
lastTid zodb.Tid
link *neo.NodeLink // XXX -> Node
// link *neo.NodeLink // XXX -> Node
err error
}
......@@ -651,13 +653,112 @@ func (m *Master) service(ctx context.Context) (err error) {
loop:
for {
select {
// a node connected and requests identification
case n := <-m.nodeCome:
_, ok := m.accept(n, /* XXX accept everyone */)
if !ok {
break
node, resp, ok := m.accept(n, /* XXX accept everyone */)
state := m.ClusterState
wg.Add(1)
go func() {
defer wg.Done()
if !ok {
reject(n.conn, resp)
return
}
err = welcome(n.conn, resp)
if err {
// XXX
}
switch node.NodeType {
case STORAGE:
switch state {
case ClusterRecovery:
storCtlRecovery(xxx, node, recovery)
case ClusterVerifying, ClusterRunning:
storCtlVerify(xxx, node, verify)
// XXX ClusterStopping
}
case CLIENT:
// TODO
}
}()
// a storage node came through recovery - if we are still recovering let's see whether
// ptid ↑ and if so we should take partition table from there
case r := <-recovery:
if m.ClusterState == ClusterRecovering {
if r.err != nil {
// XXX err ctx?
// XXX log here or in producer?
fmt.Printf("master: %v\n", r.err)
// TODO close stor link / update .nodeTab
break
}
// we are interested in latest partTab
// NOTE during recovery no one must be subscribed to
// partTab so it is ok to simply change whole m.partTab
if r.partTab.PTid > m.partTab.PTid {
m.partTab = r.partTab
}
// XXX handle case of new cluster - when no storage reports valid parttab
// XXX -> create new parttab
// XXX update something indicating cluster currently can be operational or not ?
}
// proceed to storage verification if we want the
// storage to eventually join operations
switch m.ClusterState {
case ClusterVerifying:
case ClusterRunning:
wg.Add(1)
go func() {
defer wg.Done()
storCtlVerify(xxx, r.node, verify)
}()
// XXX note e.g. ClusterStopping - not starting anything on the storage
}
// a storage node came through verification - adjust our last{Oid,Tid} if ok
// on error check - whether cluster became non-operational and reset to recovery if so
// XXX was "stop verification if so"
case v := <-verify:
if v.err != nil {
fmt.Printf("master: verify: %v\n", v.err)
// mark storage as non-working in nodeTab
// FIXME better -> v.node.setState(DOWN) ?
//m.nodeTab.UpdateLinkDown(v.link)
m.nodeTab.UpdateDown(v.node)
// check partTab is still operational
// if not -> cancel to go back to recovery
if !m.partTab.OperationalWith(&m.nodeTab) {
vcancel()
err = errClusterDegraded
break loop
}
} else {
if v.lastOid > m.lastOid {
m.lastOid = v.lastOid
}
if v.lastTid > m.lastTid {
m.lastTid = v.lastTid
}
}
// XXX what here ?
// XXX if m.partTab.OperationalWith(&.nodeTab, RUNNING) -> break (ok)
case n := <-m.nodeLeave:
m.nodeTab.UpdateLinkDown(n.link)
......@@ -672,7 +773,30 @@ loop:
// XXX what else ? (-> txn control at least)
case ech := <-m.ctlStart:
ech <- nil // we are already started
switch m.ClusterState {
case ClusterVerifying, ClusterRunning:
ech <- nil // we are already started
case ClusterRecovering:
if m.partTab.OperationalWith(&m.nodeTab) {
// reply "ok to start" after whole recovery finishes
// XXX ok? we want to retrieve all recovery information first?
// XXX or initially S is in PENDING state and
// transitions to RUNNING only after successful recovery?
rcancel()
defer func() {
ech <- nil
}()
break loop
}
ech <- fmt.Errorf("start: cluster is non-operational")
// XXX case ClusterStopping:
}
case ech := <-m.ctlStop:
close(ech) // ok
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment