Commit e993689f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7f7169ce
...@@ -199,9 +199,9 @@ func TestMasterStorage(t *testing.T) { ...@@ -199,9 +199,9 @@ func TestMasterStorage(t *testing.T) {
} }
// shortcut for nodetab change // shortcut for nodetab change
node := func(nt *neo.NodeTable, laddr string, typ neo.NodeType, num int32, state neo.NodeState, idtstamp float64) *traceNode { node := func(x *neo.NodeCommon, laddr string, typ neo.NodeType, num int32, state neo.NodeState, idtstamp float64) *traceNode {
return &traceNode{ return &traceNode{
NodeTab: unsafe.Pointer(nt), NodeTab: unsafe.Pointer(x.NodeTab),
NodeInfo: neo.NodeInfo{ NodeInfo: neo.NodeInfo{
Type: typ, Type: typ,
Addr: xnaddr(laddr), Addr: xnaddr(laddr),
...@@ -232,8 +232,8 @@ func TestMasterStorage(t *testing.T) { ...@@ -232,8 +232,8 @@ func TestMasterStorage(t *testing.T) {
// M starts listening // M starts listening
tc.Expect(netlisten("m:1")) tc.Expect(netlisten("m:1"))
tc.Expect(node(M.nodeTab, "m:1", neo.MASTER, 1, neo.RUNNING, 0.0)) tc.Expect(node(&M.node, "m:1", neo.MASTER, 1, neo.RUNNING, 0.0))
tc.Expect(clusterState(&M.clusterState, neo.ClusterRecovering)) tc.Expect(clusterState(&M.node.ClusterState, neo.ClusterRecovering))
// TODO create C; C tries connect to master - rejected ("not yet operational") // TODO create C; C tries connect to master - rejected ("not yet operational")
...@@ -260,7 +260,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -260,7 +260,7 @@ func TestMasterStorage(t *testing.T) {
IdTimestamp: 0, IdTimestamp: 0,
})) }))
tc.Expect(node(M.nodeTab, "s:1", neo.STORAGE, 1, neo.PENDING, 0.01)) tc.Expect(node(&M.node, "s:1", neo.STORAGE, 1, neo.PENDING, 0.01))
tc.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{ tc.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
...@@ -297,13 +297,13 @@ func TestMasterStorage(t *testing.T) { ...@@ -297,13 +297,13 @@ func TestMasterStorage(t *testing.T) {
exc.Raiseif(err) exc.Raiseif(err)
}) })
tc.Expect(node(M.nodeTab, "s:1", neo.STORAGE, 1, neo.RUNNING, 0.01)) tc.Expect(node(&M.node, "s:1", neo.STORAGE, 1, neo.RUNNING, 0.01))
xwait(wg) xwait(wg)
// XXX M.partTab <- S1 // XXX M.partTab <- S1
// M starts verification // M starts verification
tc.Expect(clusterState(&M.clusterState, neo.ClusterVerifying)) tc.Expect(clusterState(&M.node.ClusterState, neo.ClusterVerifying))
tc.Expect(conntx("m:2", "s:2", 1, &neo.NotifyPartitionTable{ tc.Expect(conntx("m:2", "s:2", 1, &neo.NotifyPartitionTable{
PTid: 1, PTid: 1,
...@@ -334,7 +334,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -334,7 +334,7 @@ func TestMasterStorage(t *testing.T) {
// TODO M.Stop() while verify // TODO M.Stop() while verify
// verification ok; M start service // verification ok; M start service
tc.Expect(clusterState(&M.clusterState, neo.ClusterRunning)) tc.Expect(clusterState(&M.node.ClusterState, neo.ClusterRunning))
// TODO ^^^ should be sent to S // TODO ^^^ should be sent to S
tc.Expect(conntx("m:2", "s:2", 1, &neo.StartOperation{Backup: false})) tc.Expect(conntx("m:2", "s:2", 1, &neo.StartOperation{Backup: false}))
...@@ -358,7 +358,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -358,7 +358,7 @@ func TestMasterStorage(t *testing.T) {
IdTimestamp: 0, IdTimestamp: 0,
})) }))
tc.Expect(node(M.nodeTab, "", neo.CLIENT, 1, neo.RUNNING, 0.02)) tc.Expect(node(&M.node, "", neo.CLIENT, 1, neo.RUNNING, 0.02))
tc.Expect(conntx("m:3", "c:1", 1, &neo.AcceptIdentification{ tc.Expect(conntx("m:3", "c:1", 1, &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
......
...@@ -54,12 +54,12 @@ type Master struct { ...@@ -54,12 +54,12 @@ type Master struct {
// to all nodes in cluster // to all nodes in cluster
// XXX dup from .node - kill here // XXX dup from .node - kill here
///* /*
stateMu sync.RWMutex // XXX recheck: needed ? stateMu sync.RWMutex // XXX recheck: needed ?
nodeTab *neo.NodeTable nodeTab *neo.NodeTable
partTab *neo.PartitionTable partTab *neo.PartitionTable
clusterState neo.ClusterState clusterState neo.ClusterState
//*/ */
// channels controlling main driver // channels controlling main driver
ctlStart chan chan error // request to start cluster ctlStart chan chan error // request to start cluster
...@@ -101,10 +101,12 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master { ...@@ -101,10 +101,12 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
ClusterName: clusterName, ClusterName: clusterName,
Net: net, Net: net,
MasterAddr: serveAddr, // XXX ok? MasterAddr: serveAddr, // XXX ok?
},
nodeTab: &neo.NodeTable{}, NodeTab: &neo.NodeTable{},
partTab: &neo.PartitionTable{}, PartTab: &neo.PartitionTable{},
ClusterState: -1, // invalid
},
ctlStart: make(chan chan error), ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}), ctlStop: make(chan chan struct{}),
...@@ -116,7 +118,6 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master { ...@@ -116,7 +118,6 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
monotime: monotime, monotime: monotime,
} }
m.clusterState = -1 // invalid
return m return m
} }
...@@ -148,7 +149,7 @@ func (m *Master) Shutdown() error { ...@@ -148,7 +149,7 @@ func (m *Master) Shutdown() error {
// setClusterState sets .clusterState and notifies subscribers // setClusterState sets .clusterState and notifies subscribers
func (m *Master) setClusterState(state neo.ClusterState) { func (m *Master) setClusterState(state neo.ClusterState) {
m.clusterState.Set(state) m.node.ClusterState.Set(state)
// TODO notify subscribers // TODO notify subscribers
} }
...@@ -181,7 +182,7 @@ func (m *Master) Run(ctx context.Context) (err error) { ...@@ -181,7 +182,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
} }
// update nodeTab with self // update nodeTab with self
m.nodeTab.Update(m.node.MyInfo, nil /*XXX ok? we are not connecting to self*/) m.node.NodeTab.Update(m.node.MyInfo, nil /*XXX ok? we are not connecting to self*/)
// accept incoming connections and pass them to main driver // accept incoming connections and pass them to main driver
...@@ -318,7 +319,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -318,7 +319,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
// start recovery on all storages we are currently in touch with // start recovery on all storages we are currently in touch with
// XXX close links to clients // XXX close links to clients
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.node.NodeTab.StorageList() {
if stor.State > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.State > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
wg.Add(1) wg.Add(1)
...@@ -374,25 +375,25 @@ loop: ...@@ -374,25 +375,25 @@ loop:
// close stor link / update .nodeTab // close stor link / update .nodeTab
lclose(ctx, r.stor.Link) lclose(ctx, r.stor.Link)
m.nodeTab.SetNodeState(r.stor, neo.DOWN) m.node.NodeTab.SetNodeState(r.stor, neo.DOWN)
} }
} else { } else {
// we are interested in latest partTab // we are interested in latest partTab
// NOTE during recovery no one must be subscribed to // NOTE during recovery no one must be subscribed to
// partTab so it is ok to simply change whole m.partTab // partTab so it is ok to simply change whole m.partTab
if r.partTab.PTid > m.partTab.PTid { if r.partTab.PTid > m.node.PartTab.PTid {
m.partTab = r.partTab m.node.PartTab = r.partTab
} }
} }
// update indicator whether cluster currently can be operational or not // update indicator whether cluster currently can be operational or not
var ready bool var ready bool
if m.partTab.PTid == 0 { if m.node.PartTab.PTid == 0 {
// new cluster - allow startup if we have some storages passed // new cluster - allow startup if we have some storages passed
// recovery and there is no in-progress recovery running // recovery and there is no in-progress recovery running
nup := 0 nup := 0
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.node.NodeTab.StorageList() {
if stor.State > neo.DOWN { if stor.State > neo.DOWN {
nup++ nup++
} }
...@@ -400,7 +401,7 @@ loop: ...@@ -400,7 +401,7 @@ loop:
ready = (nup > 0 && inprogress == 0) ready = (nup > 0 && inprogress == 0)
} else { } else {
ready = m.partTab.OperationalWith(m.nodeTab) // XXX + node state ready = m.node.PartTab.OperationalWith(m.node.NodeTab) // XXX + node state
} }
if readyToStart != ready { if readyToStart != ready {
...@@ -462,7 +463,7 @@ loop2: ...@@ -462,7 +463,7 @@ loop2:
// close stor link / update .nodeTab // close stor link / update .nodeTab
lclose(ctx, r.stor.Link) lclose(ctx, r.stor.Link)
m.nodeTab.SetNodeState(r.stor, neo.DOWN) m.node.NodeTab.SetNodeState(r.stor, neo.DOWN)
} }
case <-done: case <-done:
...@@ -478,24 +479,24 @@ loop2: ...@@ -478,24 +479,24 @@ loop2:
// S PENDING -> RUNNING // S PENDING -> RUNNING
// XXX recheck logic is ok for when starting existing cluster // XXX recheck logic is ok for when starting existing cluster
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.node.NodeTab.StorageList() {
if stor.State == neo.PENDING { if stor.State == neo.PENDING {
m.nodeTab.SetNodeState(stor, neo.RUNNING) m.node.NodeTab.SetNodeState(stor, neo.RUNNING)
} }
} }
// if we are starting for new cluster - create partition table // if we are starting for new cluster - create partition table
if m.partTab.PTid == 0 { if m.node.PartTab.PTid == 0 {
log.Infof(ctx, "creating new partition table") log.Infof(ctx, "creating new partition table")
// XXX -> m.nodeTab.StorageList(State > DOWN) // XXX -> m.nodeTab.StorageList(State > DOWN)
storv := []*neo.Node{} storv := []*neo.Node{}
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.node.NodeTab.StorageList() {
if stor.State > neo.DOWN { if stor.State > neo.DOWN {
storv = append(storv, stor) storv = append(storv, stor)
} }
} }
m.partTab = neo.MakePartTab(1 /* XXX hardcoded */, storv) m.node.PartTab = neo.MakePartTab(1 /* XXX hardcoded */, storv)
m.partTab.PTid = 1 m.node.PartTab.PTid = 1
} }
return nil return nil
...@@ -583,13 +584,13 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -583,13 +584,13 @@ func (m *Master) verify(ctx context.Context) (err error) {
// XXX (= py), rationale=? // XXX (= py), rationale=?
// start verification on all storages we are currently in touch with // start verification on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.node.NodeTab.StorageList() {
if stor.State > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.State > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
storCtlVerify(ctx, stor, m.partTab, verify) storCtlVerify(ctx, stor, m.node.PartTab, verify)
}() }()
} }
} }
...@@ -617,14 +618,14 @@ loop: ...@@ -617,14 +618,14 @@ loop:
return return
} }
storCtlVerify(ctx, node, m.partTab, verify) storCtlVerify(ctx, node, m.node.PartTab, verify)
}() }()
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
m.nodeTab.SetNodeState(n.node, neo.DOWN) m.node.NodeTab.SetNodeState(n.node, neo.DOWN)
// if cluster became non-operational - we cancel verification // if cluster became non-operational - we cancel verification
if !m.partTab.OperationalWith(m.nodeTab) { if !m.node.PartTab.OperationalWith(m.node.NodeTab) {
// XXX ok to instantly cancel? or better // XXX ok to instantly cancel? or better
// graceful shutdown in-flight verifications? // graceful shutdown in-flight verifications?
vcancel() vcancel()
...@@ -648,12 +649,12 @@ loop: ...@@ -648,12 +649,12 @@ loop:
// mark storage as non-working in nodeTab // mark storage as non-working in nodeTab
lclose(ctx, v.stor.Link) lclose(ctx, v.stor.Link)
m.nodeTab.SetNodeState(v.stor, neo.DOWN) m.node.NodeTab.SetNodeState(v.stor, neo.DOWN)
} }
// check partTab is still operational // check partTab is still operational
// if not -> cancel to go back to recovery // if not -> cancel to go back to recovery
if !m.partTab.OperationalWith(m.nodeTab) { if !m.node.PartTab.OperationalWith(m.node.NodeTab) {
vcancel() vcancel()
err = errClusterDegraded err = errClusterDegraded
break loop break loop
...@@ -700,7 +701,7 @@ loop2: ...@@ -700,7 +701,7 @@ loop2:
// close stor link / update .nodeTab // close stor link / update .nodeTab
lclose(ctx, v.stor.Link) lclose(ctx, v.stor.Link)
m.nodeTab.SetNodeState(v.stor, neo.DOWN) m.node.NodeTab.SetNodeState(v.stor, neo.DOWN)
} }
case <-done: case <-done:
...@@ -798,7 +799,7 @@ func (m *Master) service(ctx context.Context) (err error) { ...@@ -798,7 +799,7 @@ func (m *Master) service(ctx context.Context) (err error) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
// spawn per-storage service driver // spawn per-storage service driver
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.node.NodeTab.StorageList() {
if stor.State == neo.RUNNING { // XXX note PENDING - not adding to service; ok? if stor.State == neo.RUNNING { // XXX note PENDING - not adding to service; ok?
wg.Add(1) wg.Add(1)
go func() { go func() {
...@@ -850,10 +851,10 @@ loop: ...@@ -850,10 +851,10 @@ loop:
// XXX who sends here? // XXX who sends here?
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
m.nodeTab.SetNodeState(n.node, neo.DOWN) m.node.NodeTab.SetNodeState(n.node, neo.DOWN)
// if cluster became non-operational - cancel service // if cluster became non-operational - cancel service
if !m.partTab.OperationalWith(m.nodeTab) { if !m.node.PartTab.OperationalWith(m.node.NodeTab) {
err = errClusterDegraded err = errClusterDegraded
break loop break loop
} }
...@@ -942,8 +943,12 @@ func (m *Master) serveClient(ctx context.Context, cli *neo.Node) (err error) { ...@@ -942,8 +943,12 @@ func (m *Master) serveClient(ctx context.Context, cli *neo.Node) (err error) {
func (m *Master) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) { func (m *Master) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) {
switch req := req.(type) { switch req := req.(type) {
case *neo.AskPartitionTable: case *neo.AskPartitionTable:
// XXX // XXX lock
panic("TODO") rpt := &neo.AnswerPartitionTable{
PTid: m.node.PartTab.PTid,
RowList: m.node.PartTab.Dump(),
}
return rpt
default: default:
...@@ -978,7 +983,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp ...@@ -978,7 +983,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
// XXX check uuid matches NodeType // XXX check uuid matches NodeType
node = m.nodeTab.Get(uuid) node = m.node.NodeTab.Get(uuid)
if node != nil { if node != nil {
// reject - uuid is already occupied by someone else // reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting // XXX check also for down state - it could be the same node reconnecting
...@@ -989,7 +994,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp ...@@ -989,7 +994,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
// XXX ok to have this logic inside identify? (better provide from outside ?) // XXX ok to have this logic inside identify? (better provide from outside ?)
switch nodeType { switch nodeType {
case neo.CLIENT: case neo.CLIENT:
if m.clusterState != neo.ClusterRunning { if m.node.ClusterState != neo.ClusterRunning {
return &neo.Error{neo.NOT_READY, "cluster not operational"} return &neo.Error{neo.NOT_READY, "cluster not operational"}
} }
...@@ -1039,7 +1044,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp ...@@ -1039,7 +1044,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
IdTimestamp: m.monotime(), IdTimestamp: m.monotime(),
} }
node = m.nodeTab.Update(nodeInfo, n.conn) // NOTE this notifies all nodeTab subscribers node = m.node.NodeTab.Update(nodeInfo, n.conn) // NOTE this notifies all nodeTab subscribers
return node, accept return node, accept
} }
...@@ -1080,7 +1085,7 @@ func (m *Master) accept(ctx context.Context, conn *neo.Conn, resp neo.Msg) error ...@@ -1080,7 +1085,7 @@ func (m *Master) accept(ctx context.Context, conn *neo.Conn, resp neo.Msg) error
func (m *Master) allocUUID(nodeType neo.NodeType) neo.NodeUUID { func (m *Master) allocUUID(nodeType neo.NodeType) neo.NodeUUID {
for num := int32(1); num < 1<<24; num++ { for num := int32(1); num < 1<<24; num++ {
uuid := neo.UUID(nodeType, num) uuid := neo.UUID(nodeType, num)
if m.nodeTab.Get(uuid) == nil { if m.node.NodeTab.Get(uuid) == nil {
return uuid return uuid
} }
} }
......
...@@ -314,6 +314,8 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err ...@@ -314,6 +314,8 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
return err return err
} }
// XXX vvv move Send out of reply preparing logic
switch msg.(type) { switch msg.(type) {
default: default:
return fmt.Errorf("unexpected message: %T", msg) return fmt.Errorf("unexpected message: %T", msg)
...@@ -554,11 +556,6 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) { ...@@ -554,11 +556,6 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
// serveClient1 prepares response for 1 request from client // serveClient1 prepares response for 1 request from client
func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) { func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) {
// req, err := conn.Recv()
// if err != nil {
// return err // XXX log / err / send error before closing
// }
switch req := req.(type) { switch req := req.(type) {
case *neo.GetObject: case *neo.GetObject:
xid := zodb.Xid{Oid: req.Oid} xid := zodb.Xid{Oid: req.Oid}
...@@ -588,8 +585,6 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -588,8 +585,6 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
// XXX .DataSerial // XXX .DataSerial
} }
// req.Reply(reply) // XXX err
case *neo.LastTransaction: case *neo.LastTransaction:
lastTid, err := stor.zstor.LastTid(ctx) lastTid, err := stor.zstor.LastTid(ctx)
if err != nil { if err != nil {
...@@ -598,8 +593,6 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -598,8 +593,6 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
return &neo.AnswerLastTransaction{lastTid} return &neo.AnswerLastTransaction{lastTid}
// conn.Send(reply) // XXX err
//case *ObjectHistory: //case *ObjectHistory:
//case *StoreObject: //case *StoreObject:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment