Commit b9a07f51 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9afb634d
...@@ -141,6 +141,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -141,6 +141,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
nodeCh, nodeUnsubscribe := m.nodeTab.SubscribeBuffered() nodeCh, nodeUnsubscribe := m.nodeTab.SubscribeBuffered()
_ = nodeUnsubscribe
//partCh, partUnsubscribe := m.partTab.SubscribeBuffered() //partCh, partUnsubscribe := m.partTab.SubscribeBuffered()
// TODO cluster subscribe // TODO cluster subscribe
//clusterCh := make(chan ClusterState) //clusterCh := make(chan ClusterState)
...@@ -168,9 +169,10 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -168,9 +169,10 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
case nodeUpdateV := <-nodeCh: case nodeUpdateV := <-nodeCh:
// TODO // TODO
_ = nodeUpdateV
case clusterState = <-clusterCh: //case clusterState = <-clusterCh:
changed = true // changed = true
} }
} }
}() }()
......
...@@ -73,7 +73,7 @@ type NodeTable struct { ...@@ -73,7 +73,7 @@ type NodeTable struct {
sync.RWMutex sync.RWMutex
nodev []*Node nodev []*Node
subscribev []chan *Node notifyv []chan NodeInfo // subscribers
ver int // ↑ for versioning XXX do we need this? ver int // ↑ for versioning XXX do we need this?
} }
...@@ -91,14 +91,14 @@ type Node struct { ...@@ -91,14 +91,14 @@ type Node struct {
// it returns a channel via which updates will be delivered and unsubscribe function // it returns a channel via which updates will be delivered and unsubscribe function
// //
// XXX locking: client for subscribe/unsubscribe XXX ok? // XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) Subscribe() (ch chan *Node, unsubscribe func()) { func (nt *NodeTable) Subscribe() (ch chan NodeInfo, unsubscribe func()) {
ch = make(chan *Node) // XXX how to specify ch buf size if needed ? ch = make(chan NodeInfo) // XXX how to specify ch buf size if needed ?
nt.subscribev = append(nt.subscribev, ch) nt.notifyv = append(nt.notifyv, ch)
unsubscribe = func() { unsubscribe = func() {
for i, c := range nt.subscribev { for i, c := range nt.notifyv {
if c == ch { if c == ch {
nt.subscribev = append(nt.subscribev[:i], nt.subscribev[i+1:]...) nt.notifyv = append(nt.notifyv[:i], nt.notifyv[i+1:]...)
close(ch) close(ch)
return return
} }
...@@ -117,12 +117,12 @@ func (nt *NodeTable) Subscribe() (ch chan *Node, unsubscribe func()) { ...@@ -117,12 +117,12 @@ func (nt *NodeTable) Subscribe() (ch chan *Node, unsubscribe func()) {
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown // to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown
// //
// XXX locking: client for subscribe/unsubscribe XXX ok? // XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) SubscribeBuffered() (ch chan []*Node, unsubscribe func()) { func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func()) {
in, unsubscribe := nt.Subscribe() in, unsubscribe := nt.Subscribe()
ch = make(chan []*Node) ch = make(chan []NodeInfo)
go func() { go func() {
var updatev []*Node var updatev []NodeInfo
shutdown := false shutdown := false
for { for {
...@@ -175,10 +175,10 @@ func (nt *NodeTable) Add(node *Node) { ...@@ -175,10 +175,10 @@ func (nt *NodeTable) Add(node *Node) {
// Lookup finds node by nodeID // Lookup finds node by nodeID
func (nt *NodeTable) Lookup(nodeID NodeID) *Node { func (nt *NodeTable) Lookup(uuid NodeUUID) *Node {
// FIXME linear scan // FIXME linear scan
for _, node := range nt.nodev { for _, node := range nt.nodev {
if node.Info.NodeID == nodeID { if node.Info.NodeUUID == uuid {
return node return node
} }
} }
...@@ -197,7 +197,7 @@ func (nt *NodeTable) String() string { ...@@ -197,7 +197,7 @@ func (nt *NodeTable) String() string {
for _, node := range nt.nodev { for _, node := range nt.nodev {
// XXX recheck output // XXX recheck output
i := node.Info i := node.Info
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", i.NodeID, i.NodeType, i.NodeState, i.Address) fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", i.NodeUUID, i.NodeType, i.NodeState, i.Address)
} }
return buf.String() return buf.String()
......
...@@ -114,7 +114,7 @@ type PartitionTable struct { ...@@ -114,7 +114,7 @@ type PartitionTable struct {
// PartitionCell describes one storage in a ptid entry in partition table // PartitionCell describes one storage in a ptid entry in partition table
type PartitionCell struct { type PartitionCell struct {
NodeID NodeUUID
CellState CellState
// XXX ? + .haveUpToTid associated node has data up to such tid // XXX ? + .haveUpToTid associated node has data up to such tid
...@@ -135,7 +135,7 @@ type PartitionCell struct { ...@@ -135,7 +135,7 @@ type PartitionCell struct {
// for cluster to be really operational it has to be checked whether // for cluster to be really operational it has to be checked whether
// nodes referenced by pt are up and running // nodes referenced by pt are up and running
// //
// XXX or keep not only NodeID in PartitionCell - add *Node ? // XXX or keep not only NodeUUID in PartitionCell - add *Node ?
func (pt *PartitionTable) Operational() bool { func (pt *PartitionTable) Operational() bool {
for _, ptEntry := range pt.ptTab { for _, ptEntry := range pt.ptTab {
if len(ptEntry) == 0 { if len(ptEntry) == 0 {
......
This diff is collapsed.
...@@ -20,14 +20,14 @@ func (e *Error) Error() string { ...@@ -20,14 +20,14 @@ func (e *Error) Error() string {
const nodeTypeChar = "MSCA4567" const nodeTypeChar = "MSCA4567"
func (nid NodeID) String() string { func (nodeUUID NodeUUID) String() string {
// return ex 'S1', 'M2', ... // return ex 'S1', 'M2', ...
if nid == 0 { if nodeUUID == 0 {
return "?0" return "?0"
} }
typ := nid >> 24 typ := nodeUUID >> 24
num := nid & (1<<24 - 1) num := nodeUUID & (1<<24 - 1)
temp := typ&(1 << 7) != 0 temp := typ&(1 << 7) != 0
typ &= 1<<7 - 1 typ &= 1<<7 - 1
......
...@@ -118,7 +118,7 @@ const ( ...@@ -118,7 +118,7 @@ const (
CORRUPTED //short: C CORRUPTED //short: C
) )
// NodeID is a node identifier, 4-bytes signed integer // NodeUUID is a node identifier, 4-bytes signed integer
// //
// High-order byte: // High-order byte:
// 7 6 5 4 3 2 1 0 // 7 6 5 4 3 2 1 0
...@@ -131,9 +131,9 @@ const ( ...@@ -131,9 +131,9 @@ const (
// Extra namespace information and non-randomness of 3 LOB help to read logs. // Extra namespace information and non-randomness of 3 LOB help to read logs.
// //
// TODO -> back to 16-bytes randomly generated UUID // TODO -> back to 16-bytes randomly generated UUID
type NodeID int32 type NodeUUID int32
// TODO NodeType -> base NodeID // TODO NodeType -> base NodeUUID
var ErrDecodeOverflow = errors.New("decode: bufer overflow") var ErrDecodeOverflow = errors.New("decode: bufer overflow")
...@@ -232,18 +232,17 @@ func float64_NEODecode(b []byte) float64 { ...@@ -232,18 +232,17 @@ func float64_NEODecode(b []byte) float64 {
type NodeInfo struct { type NodeInfo struct {
NodeType NodeType
Address // serving address Address // serving address
NodeID NodeUUID
NodeState NodeState
IdTimestamp float64 IdTimestamp float64
} }
// XXX -> parttab.go ? // XXX -> parttab.go ?
type CellInfo struct { type CellInfo struct {
NodeID NodeUUID
CellState CellState
} }
//type RowList []struct {
type RowInfo struct { type RowInfo struct {
Offset uint32 // PNumber Offset uint32 // PNumber
CellList []CellInfo CellList []CellInfo
...@@ -272,7 +271,7 @@ type CloseClient struct { ...@@ -272,7 +271,7 @@ type CloseClient struct {
// connection. Any -> Any. // connection. Any -> Any.
type RequestIdentification struct { type RequestIdentification struct {
NodeType NodeType // XXX name NodeType NodeType // XXX name
NodeID NodeID NodeUUID NodeUUID
Address Address // where requesting node is also accepting connections Address Address // where requesting node is also accepting connections
Name string // XXX -> ClusterName Name string // XXX -> ClusterName
IdTimestamp float64 IdTimestamp float64
...@@ -281,10 +280,10 @@ type RequestIdentification struct { ...@@ -281,10 +280,10 @@ type RequestIdentification struct {
// XXX -> ReplyIdentification? RequestIdentification.Answer somehow ? // XXX -> ReplyIdentification? RequestIdentification.Answer somehow ?
type AcceptIdentification struct { type AcceptIdentification struct {
NodeType NodeType // XXX name NodeType NodeType // XXX name
MyNodeID NodeID MyNodeUUID NodeUUID
NumPartitions uint32 // PNumber NumPartitions uint32 // PNumber
NumReplicas uint32 // PNumber NumReplicas uint32 // PNumber
YourNodeID NodeID YourNodeUUID NodeUUID
} }
// Ask current primary master's uuid. CTL -> A. // Ask current primary master's uuid. CTL -> A.
...@@ -292,12 +291,12 @@ type PrimaryMaster struct { ...@@ -292,12 +291,12 @@ type PrimaryMaster struct {
} }
type AnswerPrimary struct { type AnswerPrimary struct {
PrimaryNodeID NodeID PrimaryNodeUUID NodeUUID
} }
// Send list of known master nodes. SM -> Any. // Send list of known master nodes. SM -> Any.
type NotPrimaryMaster struct { type NotPrimaryMaster struct {
Primary NodeID // XXX PSignedNull in py Primary NodeUUID // XXX PSignedNull in py
KnownMasterList []struct { KnownMasterList []struct {
Address Address
} }
...@@ -348,7 +347,7 @@ type PartitionChanges struct { ...@@ -348,7 +347,7 @@ type PartitionChanges struct {
CellList []struct { CellList []struct {
// XXX does below correlate with Cell inside top-level CellList ? // XXX does below correlate with Cell inside top-level CellList ?
Offset uint32 // PNumber Offset uint32 // PNumber
NodeID NodeID NodeUUID NodeUUID
CellState CellState CellState CellState
} }
} }
...@@ -421,7 +420,7 @@ type AnswerBeginTransaction struct { ...@@ -421,7 +420,7 @@ type AnswerBeginTransaction struct {
// True is returned if it's still possible to finish the transaction. // True is returned if it's still possible to finish the transaction.
type FailedVote struct { type FailedVote struct {
Tid zodb.Tid Tid zodb.Tid
NodeList []NodeID NodeList []NodeUUID
// XXX _answer = Error // XXX _answer = Error
} }
...@@ -536,7 +535,7 @@ type AnswerStoreObject struct { ...@@ -536,7 +535,7 @@ type AnswerStoreObject struct {
// Abort a transaction. C -> S and C -> PM -> S. // Abort a transaction. C -> S and C -> PM -> S.
type AbortTransaction struct { type AbortTransaction struct {
Tid zodb.Tid Tid zodb.Tid
NodeList []NodeID // unused for * -> S NodeList []NodeUUID // unused for * -> S
} }
// Ask to store a transaction. C -> S. // Ask to store a transaction. C -> S.
...@@ -645,7 +644,7 @@ type AnswerObjectHistory struct { ...@@ -645,7 +644,7 @@ type AnswerObjectHistory struct {
type PartitionList struct { type PartitionList struct {
MinOffset uint32 // PNumber MinOffset uint32 // PNumber
MaxOffset uint32 // PNumber MaxOffset uint32 // PNumber
NodeID NodeID NodeUUID NodeUUID
} }
type AnswerPartitionList struct { type AnswerPartitionList struct {
...@@ -667,7 +666,7 @@ type AnswerNodeList struct { ...@@ -667,7 +666,7 @@ type AnswerNodeList struct {
// Set the node state // Set the node state
type SetNodeState struct { type SetNodeState struct {
NodeID NodeUUID
NodeState NodeState
// XXX _answer = Error ? // XXX _answer = Error ?
...@@ -675,14 +674,14 @@ type SetNodeState struct { ...@@ -675,14 +674,14 @@ type SetNodeState struct {
// Ask the primary to include some pending node in the partition table // Ask the primary to include some pending node in the partition table
type AddPendingNodes struct { type AddPendingNodes struct {
NodeList []NodeID NodeList []NodeUUID
// XXX _answer = Error // XXX _answer = Error
} }
// Ask the primary to optimize the partition table. A -> PM. // Ask the primary to optimize the partition table. A -> PM.
type TweakPartitionTable struct { type TweakPartitionTable struct {
NodeList []NodeID NodeList []NodeUUID
// XXX _answer = Error // XXX _answer = Error
} }
...@@ -715,7 +714,7 @@ type repairFlags struct { ...@@ -715,7 +714,7 @@ type repairFlags struct {
// Ask storage nodes to repair their databases. ctl -> A -> M // Ask storage nodes to repair their databases. ctl -> A -> M
type Repair struct { type Repair struct {
NodeList []NodeID NodeList []NodeUUID
repairFlags repairFlags
} }
...@@ -803,7 +802,7 @@ type AnswerPack struct { ...@@ -803,7 +802,7 @@ type AnswerPack struct {
// ctl -> A // ctl -> A
// A -> M // A -> M
type CheckReplicas struct { type CheckReplicas struct {
PartitionDict map[uint32]NodeID // partition -> source (PNumber) PartitionDict map[uint32]NodeUUID // partition -> source (PNumber)
MinTID zodb.Tid MinTID zodb.Tid
MaxTID zodb.Tid MaxTID zodb.Tid
...@@ -870,7 +869,7 @@ type AnswerCheckSerialRange struct { ...@@ -870,7 +869,7 @@ type AnswerCheckSerialRange struct {
// S -> M // S -> M
type PartitionCorrupted struct { type PartitionCorrupted struct {
Partition uint32 // PNumber Partition uint32 // PNumber
CellList []NodeID CellList []NodeUUID
} }
......
...@@ -139,10 +139,10 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi ...@@ -139,10 +139,10 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi
err = EncodeAndSend(conn, &AcceptIdentification{ err = EncodeAndSend(conn, &AcceptIdentification{
NodeType: myNodeType, NodeType: myNodeType,
MyNodeID: 0, // XXX MyNodeUUID: 0, // XXX
NumPartitions: 0, // XXX NumPartitions: 0, // XXX
NumReplicas: 0, // XXX NumReplicas: 0, // XXX
YourNodeID: pkt.NodeID, YourNodeUUID: pkt.NodeUUID,
}) })
if err != nil { if err != nil {
...@@ -181,7 +181,7 @@ func IdentifyMe(link *NodeLink, nodeType NodeType /*XXX*/) (peerType NodeType, e ...@@ -181,7 +181,7 @@ func IdentifyMe(link *NodeLink, nodeType NodeType /*XXX*/) (peerType NodeType, e
err = EncodeAndSend(conn, &RequestIdentification{ err = EncodeAndSend(conn, &RequestIdentification{
NodeType: nodeType, NodeType: nodeType,
NodeID: 0, // XXX NodeUUID: 0, // XXX
Address: Address{}, // XXX Address: Address{}, // XXX
Name: "", // XXX cluster name ? Name: "", // XXX cluster name ?
IdTimestamp: 0, // XXX IdTimestamp: 0, // XXX
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment